jdk/src/share/classes/sun/nio/ch/AsynchronousChannelGroupImpl.java
author alanb
Wed, 06 Apr 2011 20:51:55 +0100
changeset 9237 cbb5753e87e7
parent 5506 202f599c92aa
permissions -rw-r--r--
7034155: (ch) NullPointerException in sun.io.ch.IOUtil when OOM is thrown Reviewed-by: forax

/*
 * Copyright (c) 2008, 2009, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

package sun.nio.ch;

import java.nio.channels.Channel;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.spi.AsynchronousChannelProvider;
import java.io.IOException;
import java.io.FileDescriptor;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicBoolean;
import java.security.PrivilegedAction;
import java.security.AccessController;
import java.security.AccessControlContext;
import sun.security.action.GetIntegerAction;

/**
 * Base implementation of AsynchronousChannelGroup
 */

abstract class AsynchronousChannelGroupImpl
    extends AsynchronousChannelGroup implements Executor
{
    // number of internal threads handling I/O events when using an unbounded
    // thread pool. Internal threads do not dispatch to completion handlers.
    private static final int internalThreadCount = AccessController.doPrivileged(
        new GetIntegerAction("sun.nio.ch.internalThreadPoolSize", 1));

    // associated thread pool
    private final ThreadPool pool;

    // number of tasks running (including internal)
    private final AtomicInteger threadCount = new AtomicInteger();

    // associated Executor for timeouts
    private ScheduledThreadPoolExecutor timeoutExecutor;

    // task queue for when using a fixed thread pool. In that case, thread
    // waiting on I/O events must be awokon to poll tasks from this queue.
    private final Queue<Runnable> taskQueue;

    // group shutdown
    private final AtomicBoolean shutdown = new AtomicBoolean();
    private final Object shutdownNowLock = new Object();
    private volatile boolean terminateInitiated;

    AsynchronousChannelGroupImpl(AsynchronousChannelProvider provider,
                                 ThreadPool pool)
    {
        super(provider);
        this.pool = pool;

        if (pool.isFixedThreadPool()) {
            taskQueue = new ConcurrentLinkedQueue<Runnable>();
        } else {
            taskQueue = null;   // not used
        }

        // use default thread factory as thread should not be visible to
        // application (it doesn't execute completion handlers).
        this.timeoutExecutor = (ScheduledThreadPoolExecutor)
            Executors.newScheduledThreadPool(1, ThreadPool.defaultThreadFactory());
        this.timeoutExecutor.setRemoveOnCancelPolicy(true);
    }

    final ExecutorService executor() {
        return pool.executor();
    }

    final boolean isFixedThreadPool() {
        return pool.isFixedThreadPool();
    }

    final int fixedThreadCount() {
        if (isFixedThreadPool()) {
            return pool.poolSize();
        } else {
            return pool.poolSize() + internalThreadCount;
        }
    }

    private Runnable bindToGroup(final Runnable task) {
        final AsynchronousChannelGroupImpl thisGroup = this;
        return new Runnable() {
            public void run() {
                Invoker.bindToGroup(thisGroup);
                task.run();
            }
        };
    }

    private void startInternalThread(final Runnable task) {
        AccessController.doPrivileged(new PrivilegedAction<Void>() {
            @Override
            public Void run() {
                // internal threads should not be visible to application so
                // cannot use user-supplied thread factory
                ThreadPool.defaultThreadFactory().newThread(task).start();
                return null;
            }
         });
    }

    protected final void startThreads(Runnable task) {
        if (!isFixedThreadPool()) {
            for (int i=0; i<internalThreadCount; i++) {
                startInternalThread(task);
                threadCount.incrementAndGet();
            }
        }
        if (pool.poolSize() > 0) {
            task = bindToGroup(task);
            try {
                for (int i=0; i<pool.poolSize(); i++) {
                    pool.executor().execute(task);
                    threadCount.incrementAndGet();
                }
            } catch (RejectedExecutionException  x) {
                // nothing we can do
            }
        }
    }

    final int threadCount() {
        return threadCount.get();
    }

    /**
     * Invoked by tasks as they terminate
     */
    final int threadExit(Runnable task, boolean replaceMe) {
        if (replaceMe) {
            try {
                if (Invoker.isBoundToAnyGroup()) {
                    // submit new task to replace this thread
                    pool.executor().execute(bindToGroup(task));
                } else {
                    // replace internal thread
                    startInternalThread(task);
                }
                return threadCount.get();
            } catch (RejectedExecutionException x) {
                // unable to replace
            }
        }
        return threadCount.decrementAndGet();
    }

    /**
     * Wakes up a thread waiting for I/O events to execute the given task.
     */
    abstract void executeOnHandlerTask(Runnable task);

    /**
     * For a fixed thread pool the task is queued to a thread waiting on I/O
     * events. For other thread pools we simply submit the task to the thread
     * pool.
     */
    final void executeOnPooledThread(Runnable task) {
        if (isFixedThreadPool()) {
            executeOnHandlerTask(task);
        } else {
            pool.executor().execute(bindToGroup(task));
        }
    }

    final void offerTask(Runnable task) {
        taskQueue.offer(task);
    }

    final Runnable pollTask() {
        return (taskQueue == null) ? null : taskQueue.poll();
    }

    final Future<?> schedule(Runnable task, long timeout, TimeUnit unit) {
        try {
            return timeoutExecutor.schedule(task, timeout, unit);
        } catch (RejectedExecutionException rej) {
            if (terminateInitiated) {
                // no timeout scheduled as group is terminating
                return null;
            }
            throw new AssertionError(rej);
        }
    }

    @Override
    public final boolean isShutdown() {
        return shutdown.get();
    }

    @Override
    public final boolean isTerminated()  {
        return pool.executor().isTerminated();
    }

    /**
     * Returns true if there are no channels in the group
     */
    abstract boolean isEmpty();

    /**
     * Attaches a foreign channel to this group.
     */
    abstract Object attachForeignChannel(Channel channel, FileDescriptor fdo)
        throws IOException;

    /**
     * Detaches a foreign channel from this group.
     */
    abstract void detachForeignChannel(Object key);

    /**
     * Closes all channels in the group
     */
    abstract void closeAllChannels() throws IOException;

    /**
     * Shutdown all tasks waiting for I/O events.
     */
    abstract void shutdownHandlerTasks();

    private void shutdownExecutors() {
        AccessController.doPrivileged(new PrivilegedAction<Void>() {
            public Void run() {
                pool.executor().shutdown();
                timeoutExecutor.shutdown();
                return null;
            }
        });
    }

    @Override
    public final void shutdown() {
        if (shutdown.getAndSet(true)) {
            // already shutdown
            return;
        }
        // if there are channels in the group then shutdown will continue
        // when the last channel is closed
        if (!isEmpty()) {
            return;
        }
        // initiate termination (acquire shutdownNowLock to ensure that other
        // threads invoking shutdownNow will block).
        synchronized (shutdownNowLock) {
            if (!terminateInitiated) {
                terminateInitiated = true;
                shutdownHandlerTasks();
                shutdownExecutors();
            }
        }
    }

    @Override
    public final void shutdownNow() throws IOException {
        shutdown.set(true);
        synchronized (shutdownNowLock) {
            if (!terminateInitiated) {
                terminateInitiated = true;
                closeAllChannels();
                shutdownHandlerTasks();
                shutdownExecutors();
            }
        }
    }

    /**
     * For use by AsynchronousFileChannel to release resources without shutting
     * down the thread pool.
     */
    final void detachFromThreadPool() {
        if (shutdown.getAndSet(true))
            throw new AssertionError("Already shutdown");
        if (!isEmpty())
            throw new AssertionError("Group not empty");
        shutdownHandlerTasks();
    }

    @Override
    public final boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException
    {
        return pool.executor().awaitTermination(timeout, unit);
    }

    /**
     * Executes the given command on one of the channel group's pooled threads.
     */
    @Override
    public final void execute(Runnable task) {
        SecurityManager sm = System.getSecurityManager();
        if (sm != null) {
            // when a security manager is installed then the user's task
            // must be run with the current calling context
            final AccessControlContext acc = AccessController.getContext();
            final Runnable delegate = task;
            task = new Runnable() {
                @Override
                public void run() {
                    AccessController.doPrivileged(new PrivilegedAction<Void>() {
                        @Override
                        public Void run() {
                            delegate.run();
                            return null;
                        }
                    }, acc);
                }
            };
        }
        executeOnPooledThread(task);
    }
}