jdk/src/solaris/classes/sun/nio/ch/UnixAsynchronousSocketChannelImpl.java
author simonis
Mon, 20 Jan 2014 09:24:25 +0100
changeset 22604 9b394795e216
parent 19607 bee007586d06
child 23010 6dadb192ad81
permissions -rw-r--r--
8031997: PPC64: Make the various POLL constants system dependant Reviewed-by: alanb

/*
 * Copyright (c) 2008, 2011, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

package sun.nio.ch;

import java.nio.channels.*;
import java.nio.ByteBuffer;
import java.net.*;
import java.util.concurrent.*;
import java.io.IOException;
import java.io.FileDescriptor;
import java.security.AccessController;
import sun.net.NetHooks;
import sun.security.action.GetPropertyAction;

/**
 * Unix implementation of AsynchronousSocketChannel
 */

class UnixAsynchronousSocketChannelImpl
    extends AsynchronousSocketChannelImpl implements Port.PollableChannel
{
    private final static NativeDispatcher nd = new SocketDispatcher();
    private static enum OpType { CONNECT, READ, WRITE };

    private static final boolean disableSynchronousRead;
    static {
        String propValue = AccessController.doPrivileged(
            new GetPropertyAction("sun.nio.ch.disableSynchronousRead", "false"));
        disableSynchronousRead = (propValue.length() == 0) ?
            true : Boolean.valueOf(propValue);
    }

    private final Port port;
    private final int fdVal;

    // used to ensure that the context for I/O operations that complete
    // ascynrhonously is visible to the pooled threads handling I/O events.
    private final Object updateLock = new Object();

    // pending connect (updateLock)
    private boolean connectPending;
    private CompletionHandler<Void,Object> connectHandler;
    private Object connectAttachment;
    private PendingFuture<Void,Object> connectFuture;

    // pending remote address (stateLock)
    private SocketAddress pendingRemote;

    // pending read (updateLock)
    private boolean readPending;
    private boolean isScatteringRead;
    private ByteBuffer readBuffer;
    private ByteBuffer[] readBuffers;
    private CompletionHandler<Number,Object> readHandler;
    private Object readAttachment;
    private PendingFuture<Number,Object> readFuture;
    private Future<?> readTimer;

    // pending write (updateLock)
    private boolean writePending;
    private boolean isGatheringWrite;
    private ByteBuffer writeBuffer;
    private ByteBuffer[] writeBuffers;
    private CompletionHandler<Number,Object> writeHandler;
    private Object writeAttachment;
    private PendingFuture<Number,Object> writeFuture;
    private Future<?> writeTimer;


    UnixAsynchronousSocketChannelImpl(Port port)
        throws IOException
    {
        super(port);

        // set non-blocking
        try {
            IOUtil.configureBlocking(fd, false);
        } catch (IOException x) {
            nd.close(fd);
            throw x;
        }

        this.port = port;
        this.fdVal = IOUtil.fdVal(fd);

        // add mapping from file descriptor to this channel
        port.register(fdVal, this);
    }

    // Constructor for sockets created by UnixAsynchronousServerSocketChannelImpl
    UnixAsynchronousSocketChannelImpl(Port port,
                                      FileDescriptor fd,
                                      InetSocketAddress remote)
        throws IOException
    {
        super(port, fd, remote);

        this.fdVal = IOUtil.fdVal(fd);
        IOUtil.configureBlocking(fd, false);

        try {
            port.register(fdVal, this);
        } catch (ShutdownChannelGroupException x) {
            // ShutdownChannelGroupException thrown if we attempt to register a
            // new channel after the group is shutdown
            throw new IOException(x);
        }

        this.port = port;
    }

    @Override
    public AsynchronousChannelGroupImpl group() {
        return port;
    }

    // register events for outstanding I/O operations, caller already owns updateLock
    private void updateEvents() {
        assert Thread.holdsLock(updateLock);
        int events = 0;
        if (readPending)
            events |= Net.POLLIN;
        if (connectPending || writePending)
            events |= Net.POLLOUT;
        if (events != 0)
            port.startPoll(fdVal, events);
    }

    // register events for outstanding I/O operations
    private void lockAndUpdateEvents() {
        synchronized (updateLock) {
            updateEvents();
        }
    }

    // invoke to finish read and/or write operations
    private void finish(boolean mayInvokeDirect,
                        boolean readable,
                        boolean writable)
    {
        boolean finishRead = false;
        boolean finishWrite = false;
        boolean finishConnect = false;

        // map event to pending result
        synchronized (updateLock) {
            if (readable && this.readPending) {
                this.readPending = false;
                finishRead = true;
            }
            if (writable) {
                if (this.writePending) {
                    this.writePending = false;
                    finishWrite = true;
                } else if (this.connectPending) {
                    this.connectPending = false;
                    finishConnect = true;
                }
            }
        }

        // complete the I/O operation. Special case for when channel is
        // ready for both reading and writing. In that case, submit task to
        // complete write if write operation has a completion handler.
        if (finishRead) {
            if (finishWrite)
                finishWrite(false);
            finishRead(mayInvokeDirect);
            return;
        }
        if (finishWrite) {
            finishWrite(mayInvokeDirect);
        }
        if (finishConnect) {
            finishConnect(mayInvokeDirect);
        }
    }

    /**
     * Invoked by event handler thread when file descriptor is polled
     */
    @Override
    public void onEvent(int events, boolean mayInvokeDirect) {
        boolean readable = (events & Net.POLLIN) > 0;
        boolean writable = (events & Net.POLLOUT) > 0;
        if ((events & (Net.POLLERR | Net.POLLHUP)) > 0) {
            readable = true;
            writable = true;
        }
        finish(mayInvokeDirect, readable, writable);
    }

    @Override
    void implClose() throws IOException {
        // remove the mapping
        port.unregister(fdVal);

        // close file descriptor
        nd.close(fd);

        // All outstanding I/O operations are required to fail
        finish(false, true, true);
    }

    @Override
    public void onCancel(PendingFuture<?,?> task) {
        if (task.getContext() == OpType.CONNECT)
            killConnect();
        if (task.getContext() == OpType.READ)
            killReading();
        if (task.getContext() == OpType.WRITE)
            killWriting();
    }

    // -- connect --

    private void setConnected() throws IOException {
        synchronized (stateLock) {
            state = ST_CONNECTED;
            localAddress = Net.localAddress(fd);
            remoteAddress = (InetSocketAddress)pendingRemote;
        }
    }

    private void finishConnect(boolean mayInvokeDirect) {
        Throwable e = null;
        try {
            begin();
            checkConnect(fdVal);
            setConnected();
        } catch (Throwable x) {
            if (x instanceof ClosedChannelException)
                x = new AsynchronousCloseException();
            e = x;
        } finally {
            end();
        }
        if (e != null) {
            // close channel if connection cannot be established
            try {
                close();
            } catch (Throwable suppressed) {
                e.addSuppressed(suppressed);
            }
        }

        // invoke handler and set result
        CompletionHandler<Void,Object> handler = connectHandler;
        Object att = connectAttachment;
        PendingFuture<Void,Object> future = connectFuture;
        if (handler == null) {
            future.setResult(null, e);
        } else {
            if (mayInvokeDirect) {
                Invoker.invokeUnchecked(handler, att, null, e);
            } else {
                Invoker.invokeIndirectly(this, handler, att, null, e);
            }
        }
    }

    @Override
    @SuppressWarnings("unchecked")
    <A> Future<Void> implConnect(SocketAddress remote,
                                 A attachment,
                                 CompletionHandler<Void,? super A> handler)
    {
        if (!isOpen()) {
            Throwable e = new ClosedChannelException();
            if (handler == null) {
                return CompletedFuture.withFailure(e);
            } else {
                Invoker.invoke(this, handler, attachment, null, e);
                return null;
            }
        }

        InetSocketAddress isa = Net.checkAddress(remote);

        // permission check
        SecurityManager sm = System.getSecurityManager();
        if (sm != null)
            sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());

        // check and set state
        boolean notifyBeforeTcpConnect;
        synchronized (stateLock) {
            if (state == ST_CONNECTED)
                throw new AlreadyConnectedException();
            if (state == ST_PENDING)
                throw new ConnectionPendingException();
            state = ST_PENDING;
            pendingRemote = remote;
            notifyBeforeTcpConnect = (localAddress == null);
        }

        Throwable e = null;
        try {
            begin();
            // notify hook if unbound
            if (notifyBeforeTcpConnect)
                NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort());
            int n = Net.connect(fd, isa.getAddress(), isa.getPort());
            if (n == IOStatus.UNAVAILABLE) {
                // connection could not be established immediately
                PendingFuture<Void,A> result = null;
                synchronized (updateLock) {
                    if (handler == null) {
                        result = new PendingFuture<Void,A>(this, OpType.CONNECT);
                        this.connectFuture = (PendingFuture<Void,Object>)result;
                    } else {
                        this.connectHandler = (CompletionHandler<Void,Object>)handler;
                        this.connectAttachment = attachment;
                    }
                    this.connectPending = true;
                    updateEvents();
                }
                return result;
            }
            setConnected();
        } catch (Throwable x) {
            if (x instanceof ClosedChannelException)
                x = new AsynchronousCloseException();
            e = x;
        } finally {
            end();
        }

        // close channel if connect fails
        if (e != null) {
            try {
                close();
            } catch (Throwable suppressed) {
                e.addSuppressed(suppressed);
            }
        }
        if (handler == null) {
            return CompletedFuture.withResult(null, e);
        } else {
            Invoker.invoke(this, handler, attachment, null, e);
            return null;
        }
    }

    // -- read --

    private void finishRead(boolean mayInvokeDirect) {
        int n = -1;
        Throwable exc = null;

        // copy fields as we can't access them after reading is re-enabled.
        boolean scattering = isScatteringRead;
        CompletionHandler<Number,Object> handler = readHandler;
        Object att = readAttachment;
        PendingFuture<Number,Object> future = readFuture;
        Future<?> timeout = readTimer;

        try {
            begin();

            if (scattering) {
                n = (int)IOUtil.read(fd, readBuffers, nd);
            } else {
                n = IOUtil.read(fd, readBuffer, -1, nd);
            }
            if (n == IOStatus.UNAVAILABLE) {
                // spurious wakeup, is this possible?
                synchronized (updateLock) {
                    readPending = true;
                }
                return;
            }

            // allow objects to be GC'ed.
            this.readBuffer = null;
            this.readBuffers = null;
            this.readAttachment = null;

            // allow another read to be initiated
            enableReading();

        } catch (Throwable x) {
            enableReading();
            if (x instanceof ClosedChannelException)
                x = new AsynchronousCloseException();
            exc = x;
        } finally {
            // restart poll in case of concurrent write
            if (!(exc instanceof AsynchronousCloseException))
                lockAndUpdateEvents();
            end();
        }

        // cancel the associated timer
        if (timeout != null)
            timeout.cancel(false);

        // create result
        Number result = (exc != null) ? null : (scattering) ?
            (Number)Long.valueOf(n) : (Number)Integer.valueOf(n);

        // invoke handler or set result
        if (handler == null) {
            future.setResult(result, exc);
        } else {
            if (mayInvokeDirect) {
                Invoker.invokeUnchecked(handler, att, result, exc);
            } else {
                Invoker.invokeIndirectly(this, handler, att, result, exc);
            }
        }
    }

    private Runnable readTimeoutTask = new Runnable() {
        public void run() {
            CompletionHandler<Number,Object> handler = null;
            Object att = null;
            PendingFuture<Number,Object> future = null;

            synchronized (updateLock) {
                if (!readPending)
                    return;
                readPending = false;
                handler = readHandler;
                att = readAttachment;
                future = readFuture;
            }

            // kill further reading before releasing waiters
            enableReading(true);

            // invoke handler or set result
            Exception exc = new InterruptedByTimeoutException();
            if (handler == null) {
                future.setFailure(exc);
            } else {
                AsynchronousChannel ch = UnixAsynchronousSocketChannelImpl.this;
                Invoker.invokeIndirectly(ch, handler, att, null, exc);
            }
        }
    };

    /**
     * Initiates a read or scattering read operation
     */
    @Override
    @SuppressWarnings("unchecked")
    <V extends Number,A> Future<V> implRead(boolean isScatteringRead,
                                            ByteBuffer dst,
                                            ByteBuffer[] dsts,
                                            long timeout,
                                            TimeUnit unit,
                                            A attachment,
                                            CompletionHandler<V,? super A> handler)
    {
        // A synchronous read is not attempted if disallowed by system property
        // or, we are using a fixed thread pool and the completion handler may
        // not be invoked directly (because the thread is not a pooled thread or
        // there are too many handlers on the stack).
        Invoker.GroupAndInvokeCount myGroupAndInvokeCount = null;
        boolean invokeDirect = false;
        boolean attemptRead = false;
        if (!disableSynchronousRead) {
            if (handler == null) {
                attemptRead = true;
            } else {
                myGroupAndInvokeCount = Invoker.getGroupAndInvokeCount();
                invokeDirect = Invoker.mayInvokeDirect(myGroupAndInvokeCount, port);
                // okay to attempt read with user thread pool
                attemptRead = invokeDirect || !port.isFixedThreadPool();
            }
        }

        int n = IOStatus.UNAVAILABLE;
        Throwable exc = null;
        boolean pending = false;

        try {
            begin();

            if (attemptRead) {
                if (isScatteringRead) {
                    n = (int)IOUtil.read(fd, dsts, nd);
                } else {
                    n = IOUtil.read(fd, dst, -1, nd);
                }
            }

            if (n == IOStatus.UNAVAILABLE) {
                PendingFuture<V,A> result = null;
                synchronized (updateLock) {
                    this.isScatteringRead = isScatteringRead;
                    this.readBuffer = dst;
                    this.readBuffers = dsts;
                    if (handler == null) {
                        this.readHandler = null;
                        result = new PendingFuture<V,A>(this, OpType.READ);
                        this.readFuture = (PendingFuture<Number,Object>)result;
                        this.readAttachment = null;
                    } else {
                        this.readHandler = (CompletionHandler<Number,Object>)handler;
                        this.readAttachment = attachment;
                        this.readFuture = null;
                    }
                    if (timeout > 0L) {
                        this.readTimer = port.schedule(readTimeoutTask, timeout, unit);
                    }
                    this.readPending = true;
                    updateEvents();
                }
                pending = true;
                return result;
            }
        } catch (Throwable x) {
            if (x instanceof ClosedChannelException)
                x = new AsynchronousCloseException();
            exc = x;
        } finally {
            if (!pending)
                enableReading();
            end();
        }

        Number result = (exc != null) ? null : (isScatteringRead) ?
            (Number)Long.valueOf(n) : (Number)Integer.valueOf(n);

        // read completed immediately
        if (handler != null) {
            if (invokeDirect) {
                Invoker.invokeDirect(myGroupAndInvokeCount, handler, attachment, (V)result, exc);
            } else {
                Invoker.invokeIndirectly(this, handler, attachment, (V)result, exc);
            }
            return null;
        } else {
            return CompletedFuture.withResult((V)result, exc);
        }
    }

    // -- write --

    private void finishWrite(boolean mayInvokeDirect) {
        int n = -1;
        Throwable exc = null;

        // copy fields as we can't access them after reading is re-enabled.
        boolean gathering = this.isGatheringWrite;
        CompletionHandler<Number,Object> handler = this.writeHandler;
        Object att = this.writeAttachment;
        PendingFuture<Number,Object> future = this.writeFuture;
        Future<?> timer = this.writeTimer;

        try {
            begin();

            if (gathering) {
                n = (int)IOUtil.write(fd, writeBuffers, nd);
            } else {
                n = IOUtil.write(fd, writeBuffer, -1, nd);
            }
            if (n == IOStatus.UNAVAILABLE) {
                // spurious wakeup, is this possible?
                synchronized (updateLock) {
                    writePending = true;
                }
                return;
            }

            // allow objects to be GC'ed.
            this.writeBuffer = null;
            this.writeBuffers = null;
            this.writeAttachment = null;

            // allow another write to be initiated
            enableWriting();

        } catch (Throwable x) {
            enableWriting();
            if (x instanceof ClosedChannelException)
                x = new AsynchronousCloseException();
            exc = x;
        } finally {
            // restart poll in case of concurrent write
            if (!(exc instanceof AsynchronousCloseException))
                lockAndUpdateEvents();
            end();
        }

        // cancel the associated timer
        if (timer != null)
            timer.cancel(false);

        // create result
        Number result = (exc != null) ? null : (gathering) ?
            (Number)Long.valueOf(n) : (Number)Integer.valueOf(n);

        // invoke handler or set result
        if (handler == null) {
            future.setResult(result, exc);
        } else {
            if (mayInvokeDirect) {
                Invoker.invokeUnchecked(handler, att, result, exc);
            } else {
                Invoker.invokeIndirectly(this, handler, att, result, exc);
            }
        }
    }

    private Runnable writeTimeoutTask = new Runnable() {
        public void run() {
            CompletionHandler<Number,Object> handler = null;
            Object att = null;
            PendingFuture<Number,Object> future = null;

            synchronized (updateLock) {
                if (!writePending)
                    return;
                writePending = false;
                handler = writeHandler;
                att = writeAttachment;
                future = writeFuture;
            }

            // kill further writing before releasing waiters
            enableWriting(true);

            // invoke handler or set result
            Exception exc = new InterruptedByTimeoutException();
            if (handler != null) {
                Invoker.invokeIndirectly(UnixAsynchronousSocketChannelImpl.this,
                    handler, att, null, exc);
            } else {
                future.setFailure(exc);
            }
        }
    };

    /**
     * Initiates a read or scattering read operation
     */
    @Override
    @SuppressWarnings("unchecked")
    <V extends Number,A> Future<V> implWrite(boolean isGatheringWrite,
                                             ByteBuffer src,
                                             ByteBuffer[] srcs,
                                             long timeout,
                                             TimeUnit unit,
                                             A attachment,
                                             CompletionHandler<V,? super A> handler)
    {
        Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
            Invoker.getGroupAndInvokeCount();
        boolean invokeDirect = Invoker.mayInvokeDirect(myGroupAndInvokeCount, port);
        boolean attemptWrite = (handler == null) || invokeDirect ||
            !port.isFixedThreadPool();  // okay to attempt write with user thread pool

        int n = IOStatus.UNAVAILABLE;
        Throwable exc = null;
        boolean pending = false;

        try {
            begin();

            if (attemptWrite) {
                if (isGatheringWrite) {
                    n = (int)IOUtil.write(fd, srcs, nd);
                } else {
                    n = IOUtil.write(fd, src, -1, nd);
                }
            }

            if (n == IOStatus.UNAVAILABLE) {
                PendingFuture<V,A> result = null;
                synchronized (updateLock) {
                    this.isGatheringWrite = isGatheringWrite;
                    this.writeBuffer = src;
                    this.writeBuffers = srcs;
                    if (handler == null) {
                        this.writeHandler = null;
                        result = new PendingFuture<V,A>(this, OpType.WRITE);
                        this.writeFuture = (PendingFuture<Number,Object>)result;
                        this.writeAttachment = null;
                    } else {
                        this.writeHandler = (CompletionHandler<Number,Object>)handler;
                        this.writeAttachment = attachment;
                        this.writeFuture = null;
                    }
                    if (timeout > 0L) {
                        this.writeTimer = port.schedule(writeTimeoutTask, timeout, unit);
                    }
                    this.writePending = true;
                    updateEvents();
                }
                pending = true;
                return result;
            }
        } catch (Throwable x) {
            if (x instanceof ClosedChannelException)
                x = new AsynchronousCloseException();
            exc = x;
        } finally {
            if (!pending)
                enableWriting();
            end();
        }

        Number result = (exc != null) ? null : (isGatheringWrite) ?
            (Number)Long.valueOf(n) : (Number)Integer.valueOf(n);

        // write completed immediately
        if (handler != null) {
            if (invokeDirect) {
                Invoker.invokeDirect(myGroupAndInvokeCount, handler, attachment, (V)result, exc);
            } else {
                Invoker.invokeIndirectly(this, handler, attachment, (V)result, exc);
            }
            return null;
        } else {
            return CompletedFuture.withResult((V)result, exc);
        }
    }

    // -- Native methods --

    private static native void checkConnect(int fdVal) throws IOException;

    static {
        IOUtil.load();
    }
}