jdk/src/windows/classes/sun/nio/ch/WindowsAsynchronousServerSocketChannelImpl.java
author alanb
Sun, 15 Feb 2009 12:25:54 +0000
changeset 2057 3acf8e5e2ca0
child 3632 399359a027de
permissions -rw-r--r--
6781363: New I/O: Update socket-channel API to jsr203/nio2-b99 4313887: New I/O: Improved filesystem interface 4607272: New I/O: Support asynchronous I/O Reviewed-by: sherman, chegar

/*
 * Copyright 2008-2009 Sun Microsystems, Inc.  All Rights Reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Sun designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Sun in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
 * CA 95054 USA or visit www.sun.com if you need additional information or
 * have any questions.
 */

package sun.nio.ch;

import java.nio.channels.*;
import java.net.InetSocketAddress;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.io.IOException;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.PrivilegedAction;
import sun.misc.Unsafe;

/**
 * Windows implementation of AsynchronousServerSocketChannel using overlapped I/O.
 */

class WindowsAsynchronousServerSocketChannelImpl
    extends AsynchronousServerSocketChannelImpl implements Iocp.OverlappedChannel
{
    private static final Unsafe unsafe = Unsafe.getUnsafe();

    // 2 * (sizeof(SOCKET_ADDRESS) + 16)
    private static final int DATA_BUFFER_SIZE = 88;

    private final long handle;
    private final int completionKey;
    private final Iocp iocp;

    // typically there will be zero, or one I/O operations pending. In rare
    // cases there may be more. These rare cases arise when a sequence of accept
    // operations complete immediately and handled by the initiating thread.
    // The corresponding OVERLAPPED cannot be reused/released until the completion
    // event has been posted.
    private final PendingIoCache ioCache;

    // the data buffer to receive the local/remote socket address
    private final long dataBuffer;

    // flag to indicate that an accept operation is outstanding
    private AtomicBoolean accepting = new AtomicBoolean();


    WindowsAsynchronousServerSocketChannelImpl(Iocp iocp) throws IOException {
        super(iocp);

        // associate socket with given completion port
        long h = IOUtil.fdVal(fd);
        int key;
        try {
            key = iocp.associate(this, h);
        } catch (IOException x) {
            closesocket0(h);   // prevent leak
            throw x;
        }

        this.handle = h;
        this.completionKey = key;
        this.iocp = iocp;
        this.ioCache = new PendingIoCache();
        this.dataBuffer = unsafe.allocateMemory(DATA_BUFFER_SIZE);
    }

    @Override
    public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) {
        return ioCache.remove(overlapped);
    }

    @Override
    void implClose() throws IOException {
        // close socket (which may cause outstanding accept to be aborted).
        closesocket0(handle);

        // waits until the accept operations have completed
        ioCache.close();

        // finally disassociate from the completion port
        iocp.disassociate(completionKey);

        // release other resources
        unsafe.freeMemory(dataBuffer);
    }

    @Override
    public AsynchronousChannelGroupImpl group() {
        return iocp;
    }

    /**
     * Task to initiate accept operation and to handle result.
     */
    private class AcceptTask<A> implements Runnable, Iocp.ResultHandler {
        private final WindowsAsynchronousSocketChannelImpl channel;
        private final AccessControlContext acc;
        private final PendingFuture<AsynchronousSocketChannel,A> result;

        AcceptTask(WindowsAsynchronousSocketChannelImpl channel,
                   AccessControlContext acc,
                   PendingFuture<AsynchronousSocketChannel,A> result)
        {
            this.channel = channel;
            this.acc = acc;
            this.result = result;
        }

        void enableAccept() {
            accepting.set(false);
        }

        void closeChildChannel() {
            try {
                channel.close();
            } catch (IOException ignore) { }
        }

        // caller must have acquired read lock for the listener and child channel.
        void finishAccept() throws IOException {
            /**
             * Set local/remote addresses. This is currently very inefficient
             * in that it requires 2 calls to getsockname and 2 calls to getpeername.
             * (should change this to use GetAcceptExSockaddrs)
             */
            updateAcceptContext(handle, channel.handle());

            InetSocketAddress local = Net.localAddress(channel.fd);
            final InetSocketAddress remote = Net.remoteAddress(channel.fd);
            channel.setConnected(local, remote);

            // permission check (in context of initiating thread)
            if (acc != null) {
                AccessController.doPrivileged(new PrivilegedAction<Void>() {
                    public Void run() {
                        SecurityManager sm = System.getSecurityManager();
                        sm.checkAccept(remote.getAddress().getHostAddress(),
                                       remote.getPort());
                        return null;
                    }
                }, acc);
            }
        }

        /**
         * Initiates the accept operation.
         */
        @Override
        public void run() {
            long overlapped = 0L;

            try {
                // begin usage of listener socket
                begin();
                try {
                    // begin usage of child socket (as it is registered with
                    // completion port and so may be closed in the event that
                    // the group is forcefully closed).
                    channel.begin();

                    synchronized (result) {
                        overlapped = ioCache.add(result);

                        int n = accept0(handle, channel.handle(), overlapped, dataBuffer);
                        if (n == IOStatus.UNAVAILABLE) {
                            return;
                        }

                        // connection accepted immediately
                        finishAccept();

                        // allow another accept before the result is set
                        enableAccept();
                        result.setResult(channel);
                    }
                } finally {
                    // end usage on child socket
                    channel.end();
                }
            } catch (Throwable x) {
                // failed to initiate accept so release resources
                if (overlapped != 0L)
                    ioCache.remove(overlapped);
                closeChildChannel();
                if (x instanceof ClosedChannelException)
                    x = new AsynchronousCloseException();
                if (!(x instanceof IOException) && !(x instanceof SecurityException))
                    x = new IOException(x);
                enableAccept();
                result.setFailure(x);
            } finally {
                // end of usage of listener socket
                end();
            }

            // accept completed immediately but may not have executed on
            // initiating thread in which case the operation may have been
            // cancelled.
            if (result.isCancelled()) {
                closeChildChannel();
            }

            // invoke completion handler
            Invoker.invokeIndirectly(result.handler(), result);
        }

        /**
         * Executed when the I/O has completed
         */
        @Override
        public void completed(int bytesTransferred) {
            try {
                // connection accept after group has shutdown
                if (iocp.isShutdown()) {
                    throw new IOException(new ShutdownChannelGroupException());
                }

                // finish the accept
                try {
                    begin();
                    try {
                        channel.begin();
                        finishAccept();
                    } finally {
                        channel.end();
                    }
                } finally {
                    end();
                }

                // allow another accept before the result is set
                enableAccept();
                result.setResult(channel);
            } catch (Throwable x) {
                enableAccept();
                closeChildChannel();
                if (x instanceof ClosedChannelException)
                    x = new AsynchronousCloseException();
                if (!(x instanceof IOException) && !(x instanceof SecurityException))
                    x = new IOException(x);
                result.setFailure(x);
            }

            // if an async cancel has already cancelled the operation then
            // close the new channel so as to free resources
            if (result.isCancelled()) {
                closeChildChannel();
            }

            // invoke handler (but not directly)
            Invoker.invokeIndirectly(result.handler(), result);
        }

        @Override
        public void failed(int error, IOException x) {
            enableAccept();
            closeChildChannel();

            // release waiters
            if (isOpen()) {
                result.setFailure(x);
            } else {
                result.setFailure(new AsynchronousCloseException());
            }
            Invoker.invokeIndirectly(result.handler(), result);
        }
    }

    @Override
    public <A> Future<AsynchronousSocketChannel> accept(A attachment,
        final CompletionHandler<AsynchronousSocketChannel,? super A> handler)
    {
        if (!isOpen()) {
            CompletedFuture<AsynchronousSocketChannel,A> result = CompletedFuture
                .withFailure(this, new ClosedChannelException(), attachment);
            Invoker.invokeIndirectly(handler, result);
            return result;
        }
        if (isAcceptKilled())
            throw new RuntimeException("Accept not allowed due to cancellation");

        // ensure channel is bound to local address
        if (localAddress == null)
            throw new NotYetBoundException();

        // create the socket that will be accepted. The creation of the socket
        // is enclosed by a begin/end for the listener socket to ensure that
        // we check that the listener is open and also to prevent the I/O
        // port from being closed as the new socket is registered.
        WindowsAsynchronousSocketChannelImpl ch = null;
        IOException ioe = null;
        try {
            begin();
            ch = new WindowsAsynchronousSocketChannelImpl(iocp, false);
        } catch (IOException x) {
            ioe = x;
        } finally {
            end();
        }
        if (ioe != null) {
            CompletedFuture<AsynchronousSocketChannel,A> result =
                CompletedFuture.withFailure(this, ioe, attachment);
            Invoker.invokeIndirectly(handler, result);
            return result;
        }

        // need calling context when there is security manager as
        // permission check may be done in a different thread without
        // any application call frames on the stack
        AccessControlContext acc = (System.getSecurityManager() == null) ?
            null : AccessController.getContext();

        PendingFuture<AsynchronousSocketChannel,A> result =
            new PendingFuture<AsynchronousSocketChannel,A>(this, handler, attachment);
        AcceptTask task = new AcceptTask<A>(ch, acc, result);
        result.setContext(task);

        // check and set flag to prevent concurrent accepting
        if (!accepting.compareAndSet(false, true))
            throw new AcceptPendingException();

        // initiate accept. As I/O operations are tied to the initiating thread
        // then it will only be invoked direcly if this thread is in the thread
        // pool. If this thread is not in the thread pool when a task is
        // submitted to initiate the accept.
        Invoker.invokeOnThreadInThreadPool(this, task);
        return result;
    }

    // -- Native methods --

    private static native void initIDs();

    private static native int accept0(long listenSocket, long acceptSocket,
        long overlapped, long dataBuffer) throws IOException;

    private static native void updateAcceptContext(long listenSocket,
        long acceptSocket) throws IOException;

    private static native void closesocket0(long socket) throws IOException;

    static {
        Util.load();
        initIDs();
    }
}