jdk/src/share/classes/sun/nio/ch/SimpleAsynchronousDatagramChannelImpl.java
author alanb
Sun, 15 Feb 2009 12:25:54 +0000
changeset 2057 3acf8e5e2ca0
child 3632 399359a027de
permissions -rw-r--r--
6781363: New I/O: Update socket-channel API to jsr203/nio2-b99 4313887: New I/O: Improved filesystem interface 4607272: New I/O: Support asynchronous I/O Reviewed-by: sherman, chegar

/*
 * Copyright 2008-2009 Sun Microsystems, Inc.  All Rights Reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Sun designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Sun in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
 * CA 95054 USA or visit www.sun.com if you need additional information or
 * have any questions.
 */

package sun.nio.ch;

import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.net.*;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import java.security.AccessController;
import java.security.AccessControlContext;
import java.security.PrivilegedExceptionAction;
import java.security.PrivilegedActionException;

/**
 * A prototype implementation of AsynchronousDatagramChannel, used to aid
 * test and spec development.
 */

class SimpleAsynchronousDatagramChannelImpl
    extends AsynchronousDatagramChannel implements Groupable, Cancellable
{
    private final DatagramChannel dc;
    private final AsynchronousChannelGroupImpl group;
    private final Object attachKey;
    private boolean closed;

    // used to coordinate timed and blocking reads
    private final Object readLock = new Object();

    // channel blocking mode (requires readLock)
    private boolean isBlocking = true;

    // number of blocking readers (requires readLock)
    private int blockingReaderCount;

    // true if timed read attempted while blocking read in progress (requires readLock)
    private boolean transitionToNonBlocking;

    // true if a blocking read is cancelled (requires readLock)
    private boolean blockingReadKilledByCancel;

    // temporary Selectors used by timed reads (requires readLock)
    private Selector firstReader;
    private Set<Selector> otherReaders;

    SimpleAsynchronousDatagramChannelImpl(ProtocolFamily family,
                                          AsynchronousChannelGroupImpl group)
        throws IOException
    {
        super(group.provider());
        this.dc = (family == null) ?
            DatagramChannel.open() : DatagramChannel.open(family);
        this.group = group;

        // attach this channel to the group as foreign channel
        boolean registered = false;
        try {
            if (!(dc instanceof DatagramChannelImpl))
                throw new UnsupportedOperationException();
            attachKey = group
                .attachForeignChannel(this, ((DatagramChannelImpl)dc).getFD());
            registered = true;
        } finally {
            if (!registered)
                dc.close();
        }
    }

    // throws RuntimeException if blocking read has been cancelled
    private void ensureBlockingReadNotKilled() {
        assert Thread.holdsLock(readLock);
        if (blockingReadKilledByCancel)
            throw new RuntimeException("Reading not allowed due to cancellation");
    }

    // invoke prior to non-timed read/receive
    private void beginNoTimeoutRead() {
        synchronized (readLock) {
            ensureBlockingReadNotKilled();
            if (isBlocking)
                blockingReaderCount++;
        }
    }

    // invoke after non-timed read/receive has completed
    private void endNoTimeoutRead() {
        synchronized (readLock) {
            if (isBlocking) {
                if (--blockingReaderCount == 0 && transitionToNonBlocking) {
                    // notify any threads waiting to make channel non-blocking
                    readLock.notifyAll();
                }
            }
        }
    }

    // invoke prior to timed read
    // returns the timeout remaining
    private long prepareForTimedRead(PendingFuture<?,?> result, long timeout)
        throws IOException
    {
        synchronized (readLock) {
            ensureBlockingReadNotKilled();
            if (isBlocking) {
                transitionToNonBlocking = true;
                while (blockingReaderCount > 0 &&
                       timeout > 0L &&
                       !result.isCancelled())
                {
                    long st = System.currentTimeMillis();
                    try {
                        readLock.wait(timeout);
                    } catch (InterruptedException e) { }
                    timeout -= System.currentTimeMillis() - st;
                }
                if (blockingReaderCount == 0) {
                    // re-check that blocked read wasn't cancelled
                    ensureBlockingReadNotKilled();
                    // no blocking reads so change channel to non-blocking
                    dc.configureBlocking(false);
                    isBlocking = false;
                }
            }
            return timeout;
        }
    }

    // returns a temporary Selector
    private Selector getSelector() throws IOException {
        Selector sel = Util.getTemporarySelector(dc);
        synchronized (readLock) {
            if (firstReader == null) {
                firstReader = sel;
            } else {
                if (otherReaders == null)
                    otherReaders = new HashSet<Selector>();
                otherReaders.add(sel);
            }
        }
        return sel;
    }

    // releases a temporary Selector
    private void releaseSelector(Selector sel) throws IOException {
        synchronized (readLock) {
            if (firstReader == sel) {
                firstReader = null;
            } else {
                otherReaders.remove(sel);
            }
        }
        Util.releaseTemporarySelector(sel);
    }

    // wakeup all Selectors currently in use
    private void wakeupSelectors() {
        synchronized (readLock) {
            if (firstReader != null)
                firstReader.wakeup();
            if (otherReaders != null) {
                for (Selector sel: otherReaders) {
                    sel.wakeup();
                }
            }
        }
    }

    @Override
    public AsynchronousChannelGroupImpl group() {
        return group;
    }

    @Override
    public boolean isOpen() {
        return dc.isOpen();
    }

    @Override
    public void onCancel(PendingFuture<?,?> task) {
        synchronized (readLock) {
            if (blockingReaderCount > 0) {
                blockingReadKilledByCancel = true;
                readLock.notifyAll();
                return;
            }
        }
        wakeupSelectors();
    }

    @Override
    public void close() throws IOException {
        synchronized (dc) {
            if (closed)
                return;
            closed = true;
        }
        // detach from group and close underlying channel
        group.detachForeignChannel(attachKey);
        dc.close();

        // wakeup any threads blocked in timed read/receives
        wakeupSelectors();
    }

    @Override
    public AsynchronousDatagramChannel connect(SocketAddress remote)
        throws IOException
    {
        dc.connect(remote);
        return this;
    }

    @Override
    public AsynchronousDatagramChannel disconnect() throws IOException {
        dc.disconnect();
        return this;
    }

    private static class WrappedMembershipKey extends MembershipKey {
        private final MulticastChannel channel;
        private final MembershipKey key;

        WrappedMembershipKey(MulticastChannel channel, MembershipKey key) {
            this.channel = channel;
            this.key = key;
        }

        @Override
        public boolean isValid() {
            return key.isValid();
        }

        @Override
        public void drop() {
            key.drop();
        }

        @Override
        public MulticastChannel channel() {
            return channel;
        }

        @Override
        public InetAddress group() {
            return key.group();
        }

        @Override
        public NetworkInterface networkInterface() {
            return key.networkInterface();
        }

        @Override
        public InetAddress sourceAddress() {
            return key.sourceAddress();
        }

        @Override
        public MembershipKey block(InetAddress toBlock) throws IOException {
            key.block(toBlock);
            return this;
        }

        @Override
        public MembershipKey unblock(InetAddress toUnblock) {
            key.unblock(toUnblock);
            return this;
        }

        @Override
        public String toString() {
            return key.toString();
        }
    }

    @Override
    public MembershipKey join(InetAddress group,
                              NetworkInterface interf)
        throws IOException
    {
        MembershipKey key = ((MulticastChannel)dc).join(group, interf);
        return new WrappedMembershipKey(this, key);
    }

    @Override
    public MembershipKey join(InetAddress group,
                              NetworkInterface interf,
                              InetAddress source)
        throws IOException
    {
        MembershipKey key = ((MulticastChannel)dc).join(group, interf, source);
        return new WrappedMembershipKey(this, key);
    }

    @Override
    public <A> Future<Integer> send(ByteBuffer src,
                                    SocketAddress target,
                                    long timeout,
                                    TimeUnit unit,
                                    A attachment,
                                    CompletionHandler<Integer,? super A> handler)
    {
        if (timeout < 0L)
            throw new IllegalArgumentException("Negative timeout");
        if (unit == null)
            throw new NullPointerException();

        CompletedFuture<Integer,A> result;
        try {
            int n = dc.send(src, target);
            result = CompletedFuture.withResult(this, n, attachment);
        } catch (IOException ioe) {
            result = CompletedFuture.withFailure(this, ioe, attachment);
        }
        Invoker.invoke(handler, result);
        return result;
    }

    @Override
    public <A> Future<Integer> write(ByteBuffer src,
                                     long timeout,
                                     TimeUnit unit,
                                     A attachment,
                                     CompletionHandler<Integer,? super A> handler)
    {
        if (timeout < 0L)
            throw new IllegalArgumentException("Negative timeout");
        if (unit == null)
            throw new NullPointerException();

        CompletedFuture<Integer,A> result;
        try {
            int n = dc.write(src);
            result = CompletedFuture.withResult(this, n, attachment);
        } catch (IOException ioe) {
            result = CompletedFuture.withFailure(this, ioe, attachment);
        }
        Invoker.invoke(handler, result);
        return result;
    }

    /**
     * Receive into the given buffer with privileges enabled and restricted by
     * the given AccessControlContext (can be null).
     */
    private SocketAddress doRestrictedReceive(final ByteBuffer dst,
                                              AccessControlContext acc)
        throws IOException
    {
        if (acc == null) {
            return dc.receive(dst);
        } else {
            try {
                return AccessController.doPrivileged(
                    new PrivilegedExceptionAction<SocketAddress>() {
                        public SocketAddress run() throws IOException {
                            return dc.receive(dst);
                        }}, acc);
            } catch (PrivilegedActionException pae) {
                Exception cause = pae.getException();
                if (cause instanceof SecurityException)
                    throw (SecurityException)cause;
                throw (IOException)cause;
            }
        }
    }

    @Override
    public <A> Future<SocketAddress> receive(final ByteBuffer dst,
                                             final long timeout,
                                             final TimeUnit unit,
                                             A attachment,
                                             final CompletionHandler<SocketAddress,? super A> handler)
    {
        if (dst.isReadOnly())
            throw new IllegalArgumentException("Read-only buffer");
        if (timeout < 0L)
            throw new IllegalArgumentException("Negative timeout");
        if (unit == null)
            throw new NullPointerException();

        // complete immediately if channel closed
        if (!isOpen()) {
            CompletedFuture<SocketAddress,A> result = CompletedFuture.withFailure(this,
                new ClosedChannelException(), attachment);
            Invoker.invoke(handler, result);
            return result;
        }

        final AccessControlContext acc = (System.getSecurityManager() == null) ?
            null : AccessController.getContext();
        final PendingFuture<SocketAddress,A> result =
            new PendingFuture<SocketAddress,A>(this, handler, attachment);
        Runnable task = new Runnable() {
            public void run() {
                try {
                    SocketAddress remote = null;
                    long to;
                    if (timeout == 0L) {
                        beginNoTimeoutRead();
                        try {
                            remote = doRestrictedReceive(dst, acc);
                        } finally {
                            endNoTimeoutRead();
                        }
                        to = 0L;
                    } else {
                        to = prepareForTimedRead(result, unit.toMillis(timeout));
                        if (to <= 0L)
                            throw new InterruptedByTimeoutException();
                        remote = doRestrictedReceive(dst, acc);
                    }
                    if (remote == null) {
                        Selector sel = getSelector();
                        SelectionKey sk = null;
                        try {
                            sk = dc.register(sel, SelectionKey.OP_READ);
                            for (;;) {
                                if (!dc.isOpen())
                                    throw new AsynchronousCloseException();
                                if (result.isCancelled())
                                    break;
                                long st = System.currentTimeMillis();
                                int ns = sel.select(to);
                                if (ns > 0) {
                                    remote = doRestrictedReceive(dst, acc);
                                    if (remote != null)
                                        break;
                                }
                                sel.selectedKeys().remove(sk);
                                if (timeout != 0L) {
                                    to -= System.currentTimeMillis() - st;
                                    if (to <= 0)
                                        throw new InterruptedByTimeoutException();
                                }
                            }
                        } finally {
                            if (sk != null)
                                sk.cancel();
                            releaseSelector(sel);
                        }
                    }
                    result.setResult(remote);
                } catch (Throwable x) {
                    if (x instanceof ClosedChannelException)
                        x = new AsynchronousCloseException();
                    result.setFailure(x);
                }
                Invoker.invokeUnchecked(handler, result);
            }
        };
        try {
            group.executeOnPooledThread(task);
        } catch (RejectedExecutionException ree) {
            throw new ShutdownChannelGroupException();
        }
        return result;
    }

    @Override
    public <A> Future<Integer> read(final ByteBuffer dst,
                                    final long timeout,
                                    final TimeUnit unit,
                                    A attachment,
                                    final CompletionHandler<Integer,? super A> handler)
    {
        if (dst.isReadOnly())
            throw new IllegalArgumentException("Read-only buffer");
        if (timeout < 0L)
            throw new IllegalArgumentException("Negative timeout");
        if (unit == null)
            throw new NullPointerException();
        // another thread may disconnect before read is initiated
        if (!dc.isConnected())
            throw new NotYetConnectedException();

        // complete immediately if channel closed
        if (!isOpen()) {
            CompletedFuture<Integer,A> result = CompletedFuture.withFailure(this,
                new ClosedChannelException(), attachment);
            Invoker.invoke(handler, result);
            return result;
        }

        final PendingFuture<Integer,A> result =
            new PendingFuture<Integer,A>(this, handler, attachment);
        Runnable task = new Runnable() {
            public void run() {
                try {
                    int n = 0;
                    long to;
                    if (timeout == 0L) {
                        beginNoTimeoutRead();
                        try {
                            n = dc.read(dst);
                        } finally {
                            endNoTimeoutRead();
                        }
                        to = 0L;
                    } else {
                        to = prepareForTimedRead(result, unit.toMillis(timeout));
                        if (to <= 0L)
                            throw new InterruptedByTimeoutException();
                        n = dc.read(dst);
                    }
                    if (n == 0) {
                        Selector sel = getSelector();
                        SelectionKey sk = null;
                        try {
                            sk = dc.register(sel, SelectionKey.OP_READ);
                            for (;;) {
                                if (!dc.isOpen())
                                    throw new AsynchronousCloseException();
                                if (result.isCancelled())
                                    break;
                                long st = System.currentTimeMillis();
                                int ns = sel.select(to);
                                if (ns > 0) {
                                    if ((n = dc.read(dst)) != 0)
                                        break;
                                }
                                sel.selectedKeys().remove(sk);
                                if (timeout != 0L) {
                                    to -= System.currentTimeMillis() - st;
                                    if (to <= 0)
                                        throw new InterruptedByTimeoutException();
                                }
                            }
                        } finally {
                            if (sk != null)
                                sk.cancel();
                            releaseSelector(sel);
                        }
                    }
                    result.setResult(n);
                } catch (Throwable x) {
                    if (x instanceof ClosedChannelException)
                        x = new AsynchronousCloseException();
                    result.setFailure(x);
                }
                Invoker.invokeUnchecked(handler, result);
            }
        };
        try {
            group.executeOnPooledThread(task);
        } catch (RejectedExecutionException ree) {
            throw new ShutdownChannelGroupException();
        }
        return result;
    }

    @Override
    public  AsynchronousDatagramChannel bind(SocketAddress local)
        throws IOException
    {
        dc.bind(local);
        return this;
    }

    @Override
    public SocketAddress getLocalAddress() throws IOException {
        return dc.getLocalAddress();
    }

    @Override
    public <T> AsynchronousDatagramChannel setOption(SocketOption<T> name, T value)
        throws IOException
    {
        dc.setOption(name, value);
        return this;
    }

    @Override
    public  <T> T getOption(SocketOption<T> name) throws IOException {
        return dc.getOption(name);
    }

    @Override
    public Set<SocketOption<?>> supportedOptions() {
        return dc.supportedOptions();
    }

    @Override
    public SocketAddress getRemoteAddress() throws IOException {
        return dc.getRemoteAddress();
    }
}