jdk/src/windows/classes/sun/nio/ch/WindowsAsynchronousFileChannelImpl.java
changeset 2057 3acf8e5e2ca0
child 2594 3755ecdb395d
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/windows/classes/sun/nio/ch/WindowsAsynchronousFileChannelImpl.java	Sun Feb 15 12:25:54 2009 +0000
@@ -0,0 +1,741 @@
+/*
+ * Copyright 2008-2009 Sun Microsystems, Inc.  All Rights Reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Sun designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Sun in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ */
+
+package sun.nio.ch;
+
+import java.nio.channels.*;
+import java.util.concurrent.*;
+import java.nio.ByteBuffer;
+import java.nio.BufferOverflowException;
+import java.io.IOException;
+import java.io.FileDescriptor;
+import sun.misc.SharedSecrets;
+import sun.misc.JavaIOFileDescriptorAccess;
+
+/**
+ * Windows implementation of AsynchronousFileChannel using overlapped I/O.
+ */
+
+public class WindowsAsynchronousFileChannelImpl
+    extends AsynchronousFileChannelImpl
+    implements Iocp.OverlappedChannel, Groupable
+{
+    private static final JavaIOFileDescriptorAccess fdAccess =
+        SharedSecrets.getJavaIOFileDescriptorAccess();
+
+    // error when EOF is detected asynchronously.
+    private static final int ERROR_HANDLE_EOF = 38;
+
+    // Lazy initialization of default I/O completion port
+    private static class DefaultIocpHolder {
+        static final Iocp defaultIocp = defaultIocp();
+        private static Iocp defaultIocp() {
+            try {
+                return new Iocp(null, ThreadPool.createDefault()).start();
+            } catch (IOException ioe) {
+                InternalError e = new InternalError();
+                e.initCause(ioe);
+                throw e;
+            }
+        }
+    }
+
+    // Used for force/truncate/size methods
+    private static final FileDispatcher nd = new FileDispatcherImpl();
+
+    // The handle is extracted for use in native methods invoked from this class.
+    private final long handle;
+
+    // The key that identifies the channel's association with the I/O port
+    private final int completionKey;
+
+    // I/O completion port (group)
+    private final Iocp iocp;
+
+    private final boolean isDefaultIocp;
+
+    // Caches OVERLAPPED structure for each outstanding I/O operation
+    private final PendingIoCache ioCache;
+
+
+    private WindowsAsynchronousFileChannelImpl(FileDescriptor fdObj,
+                                               boolean reading,
+                                               boolean writing,
+                                               Iocp iocp,
+                                               boolean isDefaultIocp)
+        throws IOException
+    {
+        super(fdObj, reading, writing, iocp.executor());
+        this.handle = fdAccess.getHandle(fdObj);
+        this.iocp = iocp;
+        this.isDefaultIocp = isDefaultIocp;
+        this.ioCache = new PendingIoCache();
+        this.completionKey = iocp.associate(this, handle);
+    }
+
+    public static AsynchronousFileChannel open(FileDescriptor fdo,
+                                               boolean reading,
+                                               boolean writing,
+                                               ThreadPool pool)
+        throws IOException
+    {
+        Iocp iocp;
+        boolean isDefaultIocp;
+        if (pool == null) {
+            iocp = DefaultIocpHolder.defaultIocp;
+            isDefaultIocp = true;
+        } else {
+            iocp = new Iocp(null, pool).start();
+            isDefaultIocp = false;
+        }
+        try {
+            return new
+                WindowsAsynchronousFileChannelImpl(fdo, reading, writing, iocp, isDefaultIocp);
+        } catch (IOException x) {
+            // error binding to port so need to close it (if created for this channel)
+            if (!isDefaultIocp)
+                iocp.implClose();
+            throw x;
+        }
+    }
+
+    @Override
+    public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) {
+        return ioCache.remove(overlapped);
+    }
+
+    @Override
+    public void close() throws IOException {
+        closeLock.writeLock().lock();
+        try {
+            if (closed)
+                return;     // already closed
+            closed = true;
+        } finally {
+            closeLock.writeLock().unlock();
+        }
+
+        // invalidate all locks held for this channel
+        invalidateAllLocks();
+
+        // close the file
+        close0(handle);
+
+        // waits until all I/O operations have completed
+        ioCache.close();
+
+        // disassociate from port and shutdown thread pool if not default
+        iocp.disassociate(completionKey);
+        if (!isDefaultIocp)
+            iocp.shutdown();
+    }
+
+    @Override
+    public AsynchronousChannelGroupImpl group() {
+        return iocp;
+    }
+
+    /**
+     * Translates Throwable to IOException
+     */
+    private static IOException toIOException(Throwable x) {
+        if (x instanceof IOException) {
+            if (x instanceof ClosedChannelException)
+                x = new AsynchronousCloseException();
+            return (IOException)x;
+        }
+        return new IOException(x);
+    }
+
+    @Override
+    public long size() throws IOException {
+        try {
+            begin();
+            return nd.size(fdObj);
+        } finally {
+            end();
+        }
+    }
+
+    @Override
+    public AsynchronousFileChannel truncate(long size) throws IOException {
+        if (size < 0)
+            throw new IllegalArgumentException("Negative size");
+        if (!writing)
+            throw new NonWritableChannelException();
+        try {
+            begin();
+            if (size > nd.size(fdObj))
+                return this;
+            nd.truncate(fdObj, size);
+        } finally {
+            end();
+        }
+        return this;
+    }
+
+    @Override
+    public void force(boolean metaData) throws IOException {
+        try {
+            begin();
+            nd.force(fdObj, metaData);
+        } finally {
+            end();
+        }
+    }
+
+    // -- file locking --
+
+    /**
+     * Task that initiates locking operation and handles completion result.
+     */
+    private class LockTask<A> implements Runnable, Iocp.ResultHandler {
+        private final long position;
+        private final FileLockImpl fli;
+        private final PendingFuture<FileLock,A> result;
+
+        LockTask(long position,
+                 FileLockImpl fli,
+                 PendingFuture<FileLock,A> result)
+        {
+            this.position = position;
+            this.fli = fli;
+            this.result = result;
+        }
+
+        @Override
+        public void run() {
+            long overlapped = 0L;
+            try {
+                begin();
+
+                // allocate OVERLAPPED structure
+                overlapped = ioCache.add(result);
+
+                // synchronize on result to avoid race with handler thread
+                // when lock is acquired immediately.
+                synchronized (result) {
+                    int n = lockFile(handle, position, fli.size(), fli.isShared(),
+                                     overlapped);
+                    if (n == IOStatus.UNAVAILABLE) {
+                        // I/O is pending
+                        return;
+                    }
+                    // acquired lock immediately
+                    result.setResult(fli);
+                }
+
+            } catch (Throwable x) {
+                // lock failed or channel closed
+                removeFromFileLockTable(fli);
+                if (overlapped != 0L)
+                    ioCache.remove(overlapped);
+                result.setFailure(toIOException(x));
+            } finally {
+                end();
+            }
+
+            // invoke completion handler
+            Invoker.invoke(result.handler(), result);
+        }
+
+        @Override
+        public void completed(int bytesTransferred) {
+            // release waiters and invoke completion handler
+            result.setResult(fli);
+            Invoker.invoke(result.handler(), result);
+        }
+
+        @Override
+        public void failed(int error, IOException x) {
+            // lock not acquired so remove from lock table
+            removeFromFileLockTable(fli);
+
+            // release waiters
+            if (isOpen()) {
+                result.setFailure(x);
+            } else {
+                result.setFailure(new AsynchronousCloseException());
+            }
+            Invoker.invoke(result.handler(), result);
+        }
+    }
+
+    @Override
+    public <A> Future<FileLock> lock(long position,
+                                     long size,
+                                     boolean shared,
+                                     A attachment,
+                                     CompletionHandler<FileLock,? super A> handler)
+    {
+        if (shared && !reading)
+            throw new NonReadableChannelException();
+        if (!shared && !writing)
+            throw new NonWritableChannelException();
+
+        // add to lock table
+        FileLockImpl fli = addToFileLockTable(position, size, shared);
+        if (fli == null) {
+            CompletedFuture<FileLock,A> result = CompletedFuture
+                .withFailure(this, new ClosedChannelException(), attachment);
+            Invoker.invoke(handler, result);
+            return result;
+        }
+
+        // create Future and task that will be invoked to acquire lock
+        PendingFuture<FileLock,A> result =
+            new PendingFuture<FileLock,A>(this, handler, attachment);
+        LockTask lockTask = new LockTask<A>(position, fli, result);
+        result.setContext(lockTask);
+
+        // initiate I/O (can only be done from thread in thread pool)
+        try {
+            Invoker.invokeOnThreadInThreadPool(this, lockTask);
+        } catch (ShutdownChannelGroupException e) {
+            // rollback
+            removeFromFileLockTable(fli);
+            throw e;
+        }
+        return result;
+    }
+
+    static final int NO_LOCK = -1;       // Failed to lock
+    static final int LOCKED = 0;         // Obtained requested lock
+
+    @Override
+    public FileLock tryLock(long position, long size, boolean shared)
+        throws IOException
+    {
+        if (shared && !reading)
+            throw new NonReadableChannelException();
+        if (!shared && !writing)
+            throw new NonWritableChannelException();
+
+        // add to lock table
+        final FileLockImpl fli = addToFileLockTable(position, size, shared);
+        if (fli == null)
+            throw new ClosedChannelException();
+
+        boolean gotLock = false;
+        try {
+            begin();
+            // try to acquire the lock
+            int res = nd.lock(fdObj, false, position, size, shared);
+            if (res == NO_LOCK)
+                return null;
+            gotLock = true;
+            return fli;
+        } finally {
+            if (!gotLock)
+                removeFromFileLockTable(fli);
+            end();
+        }
+    }
+
+    // invoke by FileFileImpl to release lock
+    @Override
+    void release(FileLockImpl fli) throws IOException {
+        try {
+            begin();
+            nd.release(fdObj, fli.position(), fli.size());
+            removeFromFileLockTable(fli);
+        } finally {
+            end();
+        }
+    }
+
+    /**
+     * Task that initiates read operation and handles completion result.
+     */
+    private class ReadTask<A> implements Runnable, Iocp.ResultHandler {
+        private final ByteBuffer dst;
+        private final int pos, rem;     // buffer position/remaining
+        private final long position;    // file position
+        private final PendingFuture<Integer,A> result;
+
+        // set to dst if direct; otherwise set to substituted direct buffer
+        private volatile ByteBuffer buf;
+
+        ReadTask(ByteBuffer dst,
+                 int pos,
+                 int rem,
+                 long position,
+                 PendingFuture<Integer,A> result)
+        {
+            this.dst = dst;
+            this.pos = pos;
+            this.rem = rem;
+            this.position = position;
+            this.result = result;
+        }
+
+        void releaseBufferIfSubstituted() {
+            if (buf != dst)
+                Util.releaseTemporaryDirectBuffer(buf);
+        }
+
+        void updatePosition(int bytesTransferred) {
+            // if the I/O succeeded then adjust buffer position
+            if (bytesTransferred > 0) {
+                if (buf == dst) {
+                    try {
+                        dst.position(pos + bytesTransferred);
+                    } catch (IllegalArgumentException x) {
+                        // someone has changed the position; ignore
+                    }
+                } else {
+                    // had to substitute direct buffer
+                    buf.position(bytesTransferred).flip();
+                    try {
+                        dst.put(buf);
+                    } catch (BufferOverflowException x) {
+                        // someone has changed the position; ignore
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void run() {
+            int n = -1;
+            long overlapped = 0L;
+            long address;
+
+            // Substitute a native buffer if not direct
+            if (dst instanceof DirectBuffer) {
+                buf = dst;
+                address = ((DirectBuffer)dst).address() + pos;
+            } else {
+                buf = Util.getTemporaryDirectBuffer(rem);
+                address = ((DirectBuffer)buf).address();
+            }
+
+            try {
+                begin();
+
+                // allocate OVERLAPPED
+                overlapped = ioCache.add(result);
+
+                // synchronize on result to allow this thread handle the case
+                // where the read completes immediately.
+                synchronized (result) {
+                    n = readFile(handle, address, rem, position, overlapped);
+                    if (n == IOStatus.UNAVAILABLE) {
+                        // I/O is pending
+                        return;
+                    }
+                    // read completed immediately:
+                    // 1. update buffer position
+                    // 2. release waiters
+                    updatePosition(n);
+                    result.setResult(n);
+                }
+            } catch (Throwable x) {
+                // failed to initiate read
+                result.setFailure(toIOException(x));
+            } finally {
+                end();
+            }
+
+            // read failed or EOF so completion port will not be notified
+            if (n < 0 && overlapped != 0L) {
+                ioCache.remove(overlapped);
+            }
+
+            // return direct buffer to cache if substituted
+            releaseBufferIfSubstituted();
+
+            // invoke completion handler
+            Invoker.invoke(result.handler(), result);
+        }
+
+        /**
+         * Executed when the I/O has completed
+         */
+        @Override
+        public void completed(int bytesTransferred) {
+            updatePosition(bytesTransferred);
+
+            // return direct buffer to cache if substituted
+            releaseBufferIfSubstituted();
+
+            // release waiters and invoke completion handler
+            result.setResult(bytesTransferred);
+            Invoker.invoke(result.handler(), result);
+        }
+
+        @Override
+        public void failed(int error, IOException x) {
+            // if EOF detected asynchronously then it is reported as error
+            if (error == ERROR_HANDLE_EOF) {
+                completed(-1);
+            } else {
+                // return direct buffer to cache if substituted
+                releaseBufferIfSubstituted();
+
+                // release waiters
+                if (isOpen()) {
+                    result.setFailure(x);
+                } else {
+                    result.setFailure(new AsynchronousCloseException());
+                }
+                Invoker.invoke(result.handler(), result);
+            }
+        }
+    }
+
+    @Override
+    public <A> Future<Integer> read(ByteBuffer dst,
+                                    long position,
+                                    A attachment,
+                                    CompletionHandler<Integer,? super A> handler)
+    {
+        if (!reading)
+            throw new NonReadableChannelException();
+        if (position < 0)
+            throw new IllegalArgumentException("Negative position");
+        if (dst.isReadOnly())
+            throw new IllegalArgumentException("Read-only buffer");
+
+        // check if channel is closed
+        if (!isOpen()) {
+            CompletedFuture<Integer,A> result = CompletedFuture
+                .withFailure(this, new ClosedChannelException(), attachment);
+            Invoker.invoke(handler, result);
+            return result;
+        }
+
+        int pos = dst.position();
+        int lim = dst.limit();
+        assert (pos <= lim);
+        int rem = (pos <= lim ? lim - pos : 0);
+
+        // no space remaining
+        if (rem == 0) {
+            CompletedFuture<Integer,A> result =
+                CompletedFuture.withResult(this, 0, attachment);
+            Invoker.invoke(handler, result);
+            return result;
+        }
+
+        // create Future and task that initiates read
+        PendingFuture<Integer,A> result =
+            new PendingFuture<Integer,A>(this, handler, attachment);
+        ReadTask readTask = new ReadTask<A>(dst, pos, rem, position, result);
+        result.setContext(readTask);
+
+        // initiate I/O (can only be done from thread in thread pool)
+        Invoker.invokeOnThreadInThreadPool(this, readTask);
+        return result;
+    }
+
+    /**
+     * Task that initiates write operation and handles completion result.
+     */
+    private class WriteTask<A> implements Runnable, Iocp.ResultHandler {
+        private final ByteBuffer src;
+        private final int pos, rem;     // buffer position/remaining
+        private final long position;    // file position
+        private final PendingFuture<Integer,A> result;
+
+        // set to src if direct; otherwise set to substituted direct buffer
+        private volatile ByteBuffer buf;
+
+        WriteTask(ByteBuffer src,
+                  int pos,
+                  int rem,
+                  long position,
+                  PendingFuture<Integer,A> result)
+        {
+            this.src = src;
+            this.pos = pos;
+            this.rem = rem;
+            this.position = position;
+            this.result = result;
+        }
+
+        void releaseBufferIfSubstituted() {
+            if (buf != src)
+                Util.releaseTemporaryDirectBuffer(buf);
+        }
+
+        void updatePosition(int bytesTransferred) {
+            // if the I/O succeeded then adjust buffer position
+            if (bytesTransferred > 0) {
+                try {
+                    src.position(pos + bytesTransferred);
+                } catch (IllegalArgumentException x) {
+                    // someone has changed the position
+                }
+            }
+        }
+
+        @Override
+        public void run() {
+            int n = -1;
+            long overlapped = 0L;
+            long address;
+
+            // Substitute a native buffer if not direct
+            if (src instanceof DirectBuffer) {
+                buf = src;
+                address = ((DirectBuffer)src).address() + pos;
+            } else {
+                buf = Util.getTemporaryDirectBuffer(rem);
+                buf.put(src);
+                buf.flip();
+                // temporarily restore position as we don't know how many bytes
+                // will be written
+                src.position(pos);
+                address = ((DirectBuffer)buf).address();
+            }
+
+            try {
+                begin();
+
+                // allocate an OVERLAPPED structure
+                overlapped = ioCache.add(result);
+
+                // synchronize on result to allow this thread handle the case
+                // where the read completes immediately.
+                synchronized (result) {
+                    n = writeFile(handle, address, rem, position, overlapped);
+                    if (n == IOStatus.UNAVAILABLE) {
+                        // I/O is pending
+                        return;
+                    }
+                    // read completed immediately:
+                    // 1. update buffer position
+                    // 2. release waiters
+                    updatePosition(n);
+                    result.setResult(n);
+                }
+            } catch (Throwable x) {
+                // failed to initiate read:
+                result.setFailure(toIOException(x));
+
+                // release resources
+                if (overlapped != 0L)
+                    ioCache.remove(overlapped);
+                releaseBufferIfSubstituted();
+
+            } finally {
+                end();
+            }
+
+            // invoke completion handler
+            Invoker.invoke(result.handler(), result);
+        }
+
+        /**
+         * Executed when the I/O has completed
+         */
+        @Override
+        public void completed(int bytesTransferred) {
+            updatePosition(bytesTransferred);
+
+            // return direct buffer to cache if substituted
+            releaseBufferIfSubstituted();
+
+            // release waiters and invoke completion handler
+            result.setResult(bytesTransferred);
+            Invoker.invoke(result.handler(), result);
+        }
+
+        @Override
+        public void failed(int error, IOException x) {
+            // return direct buffer to cache if substituted
+            releaseBufferIfSubstituted();
+
+            // release waiters and invoker completion handler
+            if (isOpen()) {
+                result.setFailure(x);
+            } else {
+                result.setFailure(new AsynchronousCloseException());
+            }
+            Invoker.invoke(result.handler(), result);
+        }
+    }
+
+    @Override
+    public <A> Future<Integer> write(ByteBuffer src,
+                                     long position,
+                                     A attachment,
+                                     CompletionHandler<Integer,? super A> handler)
+    {
+        if (!writing)
+            throw new NonWritableChannelException();
+        if (position < 0)
+            throw new IllegalArgumentException("Negative position");
+
+        // check if channel is closed
+        if (!isOpen()) {
+            CompletedFuture<Integer,A> result = CompletedFuture
+                .withFailure(this, new ClosedChannelException(), attachment);
+            Invoker.invoke(handler, result);
+            return result;
+        }
+
+        int pos = src.position();
+        int lim = src.limit();
+        assert (pos <= lim);
+        int rem = (pos <= lim ? lim - pos : 0);
+
+        // nothing to write
+        if (rem == 0) {
+            CompletedFuture<Integer,A> result =
+                CompletedFuture.withResult(this, 0, attachment);
+            Invoker.invoke(handler, result);
+            return result;
+        }
+
+        // create Future and task to initiate write
+        PendingFuture<Integer,A> result =
+            new PendingFuture<Integer,A>(this, handler, attachment);
+        WriteTask writeTask = new WriteTask<A>(src, pos, rem, position, result);
+        result.setContext(writeTask);
+
+        // initiate I/O (can only be done from thread in thread pool)
+        Invoker.invokeOnThreadInThreadPool(this, writeTask);
+        return result;
+    }
+
+    // -- Native methods --
+
+    private static native int readFile(long handle, long address, int len,
+        long offset, long overlapped) throws IOException;
+
+    private static native int writeFile(long handle, long address, int len,
+        long offset, long overlapped) throws IOException;
+
+    private static native int lockFile(long handle, long position, long size,
+        boolean shared, long overlapped) throws IOException;
+
+    private static native void close0(long handle);
+
+    static {
+        Util.load();
+    }
+}