jdk/src/windows/classes/sun/nio/ch/WindowsAsynchronousFileChannelImpl.java
author alanb
Sun, 15 Feb 2009 12:25:54 +0000
changeset 2057 3acf8e5e2ca0
child 2594 3755ecdb395d
permissions -rw-r--r--
6781363: New I/O: Update socket-channel API to jsr203/nio2-b99 4313887: New I/O: Improved filesystem interface 4607272: New I/O: Support asynchronous I/O Reviewed-by: sherman, chegar

/*
 * Copyright 2008-2009 Sun Microsystems, Inc.  All Rights Reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Sun designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Sun in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
 * CA 95054 USA or visit www.sun.com if you need additional information or
 * have any questions.
 */

package sun.nio.ch;

import java.nio.channels.*;
import java.util.concurrent.*;
import java.nio.ByteBuffer;
import java.nio.BufferOverflowException;
import java.io.IOException;
import java.io.FileDescriptor;
import sun.misc.SharedSecrets;
import sun.misc.JavaIOFileDescriptorAccess;

/**
 * Windows implementation of AsynchronousFileChannel using overlapped I/O.
 */

public class WindowsAsynchronousFileChannelImpl
    extends AsynchronousFileChannelImpl
    implements Iocp.OverlappedChannel, Groupable
{
    private static final JavaIOFileDescriptorAccess fdAccess =
        SharedSecrets.getJavaIOFileDescriptorAccess();

    // error when EOF is detected asynchronously.
    private static final int ERROR_HANDLE_EOF = 38;

    // Lazy initialization of default I/O completion port
    private static class DefaultIocpHolder {
        static final Iocp defaultIocp = defaultIocp();
        private static Iocp defaultIocp() {
            try {
                return new Iocp(null, ThreadPool.createDefault()).start();
            } catch (IOException ioe) {
                InternalError e = new InternalError();
                e.initCause(ioe);
                throw e;
            }
        }
    }

    // Used for force/truncate/size methods
    private static final FileDispatcher nd = new FileDispatcherImpl();

    // The handle is extracted for use in native methods invoked from this class.
    private final long handle;

    // The key that identifies the channel's association with the I/O port
    private final int completionKey;

    // I/O completion port (group)
    private final Iocp iocp;

    private final boolean isDefaultIocp;

    // Caches OVERLAPPED structure for each outstanding I/O operation
    private final PendingIoCache ioCache;


    private WindowsAsynchronousFileChannelImpl(FileDescriptor fdObj,
                                               boolean reading,
                                               boolean writing,
                                               Iocp iocp,
                                               boolean isDefaultIocp)
        throws IOException
    {
        super(fdObj, reading, writing, iocp.executor());
        this.handle = fdAccess.getHandle(fdObj);
        this.iocp = iocp;
        this.isDefaultIocp = isDefaultIocp;
        this.ioCache = new PendingIoCache();
        this.completionKey = iocp.associate(this, handle);
    }

    public static AsynchronousFileChannel open(FileDescriptor fdo,
                                               boolean reading,
                                               boolean writing,
                                               ThreadPool pool)
        throws IOException
    {
        Iocp iocp;
        boolean isDefaultIocp;
        if (pool == null) {
            iocp = DefaultIocpHolder.defaultIocp;
            isDefaultIocp = true;
        } else {
            iocp = new Iocp(null, pool).start();
            isDefaultIocp = false;
        }
        try {
            return new
                WindowsAsynchronousFileChannelImpl(fdo, reading, writing, iocp, isDefaultIocp);
        } catch (IOException x) {
            // error binding to port so need to close it (if created for this channel)
            if (!isDefaultIocp)
                iocp.implClose();
            throw x;
        }
    }

    @Override
    public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) {
        return ioCache.remove(overlapped);
    }

    @Override
    public void close() throws IOException {
        closeLock.writeLock().lock();
        try {
            if (closed)
                return;     // already closed
            closed = true;
        } finally {
            closeLock.writeLock().unlock();
        }

        // invalidate all locks held for this channel
        invalidateAllLocks();

        // close the file
        close0(handle);

        // waits until all I/O operations have completed
        ioCache.close();

        // disassociate from port and shutdown thread pool if not default
        iocp.disassociate(completionKey);
        if (!isDefaultIocp)
            iocp.shutdown();
    }

    @Override
    public AsynchronousChannelGroupImpl group() {
        return iocp;
    }

    /**
     * Translates Throwable to IOException
     */
    private static IOException toIOException(Throwable x) {
        if (x instanceof IOException) {
            if (x instanceof ClosedChannelException)
                x = new AsynchronousCloseException();
            return (IOException)x;
        }
        return new IOException(x);
    }

    @Override
    public long size() throws IOException {
        try {
            begin();
            return nd.size(fdObj);
        } finally {
            end();
        }
    }

    @Override
    public AsynchronousFileChannel truncate(long size) throws IOException {
        if (size < 0)
            throw new IllegalArgumentException("Negative size");
        if (!writing)
            throw new NonWritableChannelException();
        try {
            begin();
            if (size > nd.size(fdObj))
                return this;
            nd.truncate(fdObj, size);
        } finally {
            end();
        }
        return this;
    }

    @Override
    public void force(boolean metaData) throws IOException {
        try {
            begin();
            nd.force(fdObj, metaData);
        } finally {
            end();
        }
    }

    // -- file locking --

    /**
     * Task that initiates locking operation and handles completion result.
     */
    private class LockTask<A> implements Runnable, Iocp.ResultHandler {
        private final long position;
        private final FileLockImpl fli;
        private final PendingFuture<FileLock,A> result;

        LockTask(long position,
                 FileLockImpl fli,
                 PendingFuture<FileLock,A> result)
        {
            this.position = position;
            this.fli = fli;
            this.result = result;
        }

        @Override
        public void run() {
            long overlapped = 0L;
            try {
                begin();

                // allocate OVERLAPPED structure
                overlapped = ioCache.add(result);

                // synchronize on result to avoid race with handler thread
                // when lock is acquired immediately.
                synchronized (result) {
                    int n = lockFile(handle, position, fli.size(), fli.isShared(),
                                     overlapped);
                    if (n == IOStatus.UNAVAILABLE) {
                        // I/O is pending
                        return;
                    }
                    // acquired lock immediately
                    result.setResult(fli);
                }

            } catch (Throwable x) {
                // lock failed or channel closed
                removeFromFileLockTable(fli);
                if (overlapped != 0L)
                    ioCache.remove(overlapped);
                result.setFailure(toIOException(x));
            } finally {
                end();
            }

            // invoke completion handler
            Invoker.invoke(result.handler(), result);
        }

        @Override
        public void completed(int bytesTransferred) {
            // release waiters and invoke completion handler
            result.setResult(fli);
            Invoker.invoke(result.handler(), result);
        }

        @Override
        public void failed(int error, IOException x) {
            // lock not acquired so remove from lock table
            removeFromFileLockTable(fli);

            // release waiters
            if (isOpen()) {
                result.setFailure(x);
            } else {
                result.setFailure(new AsynchronousCloseException());
            }
            Invoker.invoke(result.handler(), result);
        }
    }

    @Override
    public <A> Future<FileLock> lock(long position,
                                     long size,
                                     boolean shared,
                                     A attachment,
                                     CompletionHandler<FileLock,? super A> handler)
    {
        if (shared && !reading)
            throw new NonReadableChannelException();
        if (!shared && !writing)
            throw new NonWritableChannelException();

        // add to lock table
        FileLockImpl fli = addToFileLockTable(position, size, shared);
        if (fli == null) {
            CompletedFuture<FileLock,A> result = CompletedFuture
                .withFailure(this, new ClosedChannelException(), attachment);
            Invoker.invoke(handler, result);
            return result;
        }

        // create Future and task that will be invoked to acquire lock
        PendingFuture<FileLock,A> result =
            new PendingFuture<FileLock,A>(this, handler, attachment);
        LockTask lockTask = new LockTask<A>(position, fli, result);
        result.setContext(lockTask);

        // initiate I/O (can only be done from thread in thread pool)
        try {
            Invoker.invokeOnThreadInThreadPool(this, lockTask);
        } catch (ShutdownChannelGroupException e) {
            // rollback
            removeFromFileLockTable(fli);
            throw e;
        }
        return result;
    }

    static final int NO_LOCK = -1;       // Failed to lock
    static final int LOCKED = 0;         // Obtained requested lock

    @Override
    public FileLock tryLock(long position, long size, boolean shared)
        throws IOException
    {
        if (shared && !reading)
            throw new NonReadableChannelException();
        if (!shared && !writing)
            throw new NonWritableChannelException();

        // add to lock table
        final FileLockImpl fli = addToFileLockTable(position, size, shared);
        if (fli == null)
            throw new ClosedChannelException();

        boolean gotLock = false;
        try {
            begin();
            // try to acquire the lock
            int res = nd.lock(fdObj, false, position, size, shared);
            if (res == NO_LOCK)
                return null;
            gotLock = true;
            return fli;
        } finally {
            if (!gotLock)
                removeFromFileLockTable(fli);
            end();
        }
    }

    // invoke by FileFileImpl to release lock
    @Override
    void release(FileLockImpl fli) throws IOException {
        try {
            begin();
            nd.release(fdObj, fli.position(), fli.size());
            removeFromFileLockTable(fli);
        } finally {
            end();
        }
    }

    /**
     * Task that initiates read operation and handles completion result.
     */
    private class ReadTask<A> implements Runnable, Iocp.ResultHandler {
        private final ByteBuffer dst;
        private final int pos, rem;     // buffer position/remaining
        private final long position;    // file position
        private final PendingFuture<Integer,A> result;

        // set to dst if direct; otherwise set to substituted direct buffer
        private volatile ByteBuffer buf;

        ReadTask(ByteBuffer dst,
                 int pos,
                 int rem,
                 long position,
                 PendingFuture<Integer,A> result)
        {
            this.dst = dst;
            this.pos = pos;
            this.rem = rem;
            this.position = position;
            this.result = result;
        }

        void releaseBufferIfSubstituted() {
            if (buf != dst)
                Util.releaseTemporaryDirectBuffer(buf);
        }

        void updatePosition(int bytesTransferred) {
            // if the I/O succeeded then adjust buffer position
            if (bytesTransferred > 0) {
                if (buf == dst) {
                    try {
                        dst.position(pos + bytesTransferred);
                    } catch (IllegalArgumentException x) {
                        // someone has changed the position; ignore
                    }
                } else {
                    // had to substitute direct buffer
                    buf.position(bytesTransferred).flip();
                    try {
                        dst.put(buf);
                    } catch (BufferOverflowException x) {
                        // someone has changed the position; ignore
                    }
                }
            }
        }

        @Override
        public void run() {
            int n = -1;
            long overlapped = 0L;
            long address;

            // Substitute a native buffer if not direct
            if (dst instanceof DirectBuffer) {
                buf = dst;
                address = ((DirectBuffer)dst).address() + pos;
            } else {
                buf = Util.getTemporaryDirectBuffer(rem);
                address = ((DirectBuffer)buf).address();
            }

            try {
                begin();

                // allocate OVERLAPPED
                overlapped = ioCache.add(result);

                // synchronize on result to allow this thread handle the case
                // where the read completes immediately.
                synchronized (result) {
                    n = readFile(handle, address, rem, position, overlapped);
                    if (n == IOStatus.UNAVAILABLE) {
                        // I/O is pending
                        return;
                    }
                    // read completed immediately:
                    // 1. update buffer position
                    // 2. release waiters
                    updatePosition(n);
                    result.setResult(n);
                }
            } catch (Throwable x) {
                // failed to initiate read
                result.setFailure(toIOException(x));
            } finally {
                end();
            }

            // read failed or EOF so completion port will not be notified
            if (n < 0 && overlapped != 0L) {
                ioCache.remove(overlapped);
            }

            // return direct buffer to cache if substituted
            releaseBufferIfSubstituted();

            // invoke completion handler
            Invoker.invoke(result.handler(), result);
        }

        /**
         * Executed when the I/O has completed
         */
        @Override
        public void completed(int bytesTransferred) {
            updatePosition(bytesTransferred);

            // return direct buffer to cache if substituted
            releaseBufferIfSubstituted();

            // release waiters and invoke completion handler
            result.setResult(bytesTransferred);
            Invoker.invoke(result.handler(), result);
        }

        @Override
        public void failed(int error, IOException x) {
            // if EOF detected asynchronously then it is reported as error
            if (error == ERROR_HANDLE_EOF) {
                completed(-1);
            } else {
                // return direct buffer to cache if substituted
                releaseBufferIfSubstituted();

                // release waiters
                if (isOpen()) {
                    result.setFailure(x);
                } else {
                    result.setFailure(new AsynchronousCloseException());
                }
                Invoker.invoke(result.handler(), result);
            }
        }
    }

    @Override
    public <A> Future<Integer> read(ByteBuffer dst,
                                    long position,
                                    A attachment,
                                    CompletionHandler<Integer,? super A> handler)
    {
        if (!reading)
            throw new NonReadableChannelException();
        if (position < 0)
            throw new IllegalArgumentException("Negative position");
        if (dst.isReadOnly())
            throw new IllegalArgumentException("Read-only buffer");

        // check if channel is closed
        if (!isOpen()) {
            CompletedFuture<Integer,A> result = CompletedFuture
                .withFailure(this, new ClosedChannelException(), attachment);
            Invoker.invoke(handler, result);
            return result;
        }

        int pos = dst.position();
        int lim = dst.limit();
        assert (pos <= lim);
        int rem = (pos <= lim ? lim - pos : 0);

        // no space remaining
        if (rem == 0) {
            CompletedFuture<Integer,A> result =
                CompletedFuture.withResult(this, 0, attachment);
            Invoker.invoke(handler, result);
            return result;
        }

        // create Future and task that initiates read
        PendingFuture<Integer,A> result =
            new PendingFuture<Integer,A>(this, handler, attachment);
        ReadTask readTask = new ReadTask<A>(dst, pos, rem, position, result);
        result.setContext(readTask);

        // initiate I/O (can only be done from thread in thread pool)
        Invoker.invokeOnThreadInThreadPool(this, readTask);
        return result;
    }

    /**
     * Task that initiates write operation and handles completion result.
     */
    private class WriteTask<A> implements Runnable, Iocp.ResultHandler {
        private final ByteBuffer src;
        private final int pos, rem;     // buffer position/remaining
        private final long position;    // file position
        private final PendingFuture<Integer,A> result;

        // set to src if direct; otherwise set to substituted direct buffer
        private volatile ByteBuffer buf;

        WriteTask(ByteBuffer src,
                  int pos,
                  int rem,
                  long position,
                  PendingFuture<Integer,A> result)
        {
            this.src = src;
            this.pos = pos;
            this.rem = rem;
            this.position = position;
            this.result = result;
        }

        void releaseBufferIfSubstituted() {
            if (buf != src)
                Util.releaseTemporaryDirectBuffer(buf);
        }

        void updatePosition(int bytesTransferred) {
            // if the I/O succeeded then adjust buffer position
            if (bytesTransferred > 0) {
                try {
                    src.position(pos + bytesTransferred);
                } catch (IllegalArgumentException x) {
                    // someone has changed the position
                }
            }
        }

        @Override
        public void run() {
            int n = -1;
            long overlapped = 0L;
            long address;

            // Substitute a native buffer if not direct
            if (src instanceof DirectBuffer) {
                buf = src;
                address = ((DirectBuffer)src).address() + pos;
            } else {
                buf = Util.getTemporaryDirectBuffer(rem);
                buf.put(src);
                buf.flip();
                // temporarily restore position as we don't know how many bytes
                // will be written
                src.position(pos);
                address = ((DirectBuffer)buf).address();
            }

            try {
                begin();

                // allocate an OVERLAPPED structure
                overlapped = ioCache.add(result);

                // synchronize on result to allow this thread handle the case
                // where the read completes immediately.
                synchronized (result) {
                    n = writeFile(handle, address, rem, position, overlapped);
                    if (n == IOStatus.UNAVAILABLE) {
                        // I/O is pending
                        return;
                    }
                    // read completed immediately:
                    // 1. update buffer position
                    // 2. release waiters
                    updatePosition(n);
                    result.setResult(n);
                }
            } catch (Throwable x) {
                // failed to initiate read:
                result.setFailure(toIOException(x));

                // release resources
                if (overlapped != 0L)
                    ioCache.remove(overlapped);
                releaseBufferIfSubstituted();

            } finally {
                end();
            }

            // invoke completion handler
            Invoker.invoke(result.handler(), result);
        }

        /**
         * Executed when the I/O has completed
         */
        @Override
        public void completed(int bytesTransferred) {
            updatePosition(bytesTransferred);

            // return direct buffer to cache if substituted
            releaseBufferIfSubstituted();

            // release waiters and invoke completion handler
            result.setResult(bytesTransferred);
            Invoker.invoke(result.handler(), result);
        }

        @Override
        public void failed(int error, IOException x) {
            // return direct buffer to cache if substituted
            releaseBufferIfSubstituted();

            // release waiters and invoker completion handler
            if (isOpen()) {
                result.setFailure(x);
            } else {
                result.setFailure(new AsynchronousCloseException());
            }
            Invoker.invoke(result.handler(), result);
        }
    }

    @Override
    public <A> Future<Integer> write(ByteBuffer src,
                                     long position,
                                     A attachment,
                                     CompletionHandler<Integer,? super A> handler)
    {
        if (!writing)
            throw new NonWritableChannelException();
        if (position < 0)
            throw new IllegalArgumentException("Negative position");

        // check if channel is closed
        if (!isOpen()) {
            CompletedFuture<Integer,A> result = CompletedFuture
                .withFailure(this, new ClosedChannelException(), attachment);
            Invoker.invoke(handler, result);
            return result;
        }

        int pos = src.position();
        int lim = src.limit();
        assert (pos <= lim);
        int rem = (pos <= lim ? lim - pos : 0);

        // nothing to write
        if (rem == 0) {
            CompletedFuture<Integer,A> result =
                CompletedFuture.withResult(this, 0, attachment);
            Invoker.invoke(handler, result);
            return result;
        }

        // create Future and task to initiate write
        PendingFuture<Integer,A> result =
            new PendingFuture<Integer,A>(this, handler, attachment);
        WriteTask writeTask = new WriteTask<A>(src, pos, rem, position, result);
        result.setContext(writeTask);

        // initiate I/O (can only be done from thread in thread pool)
        Invoker.invokeOnThreadInThreadPool(this, writeTask);
        return result;
    }

    // -- Native methods --

    private static native int readFile(long handle, long address, int len,
        long offset, long overlapped) throws IOException;

    private static native int writeFile(long handle, long address, int len,
        long offset, long overlapped) throws IOException;

    private static native int lockFile(long handle, long position, long size,
        boolean shared, long overlapped) throws IOException;

    private static native void close0(long handle);

    static {
        Util.load();
    }
}