8080225: FileInput/OutputStream/FileChannel cleanup should be improved
Reviewed-by: mchung, plevart, bpb
/*
* Copyright (c) 2008, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package sun.nio.ch;
import java.nio.channels.Channel;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.spi.AsynchronousChannelProvider;
import java.io.IOException;
import java.io.FileDescriptor;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicBoolean;
import java.security.PrivilegedAction;
import java.security.AccessController;
import java.security.AccessControlContext;
import sun.security.action.GetIntegerAction;
/**
* Base implementation of AsynchronousChannelGroup
*/
abstract class AsynchronousChannelGroupImpl
extends AsynchronousChannelGroup implements Executor
{
// number of internal threads handling I/O events when using an unbounded
// thread pool. Internal threads do not dispatch to completion handlers.
private static final int internalThreadCount = AccessController.doPrivileged(
new GetIntegerAction("sun.nio.ch.internalThreadPoolSize", 1));
// associated thread pool
private final ThreadPool pool;
// number of tasks running (including internal)
private final AtomicInteger threadCount = new AtomicInteger();
// associated Executor for timeouts
private ScheduledThreadPoolExecutor timeoutExecutor;
// task queue for when using a fixed thread pool. In that case, thread
// waiting on I/O events must be awokon to poll tasks from this queue.
private final Queue<Runnable> taskQueue;
// group shutdown
private final AtomicBoolean shutdown = new AtomicBoolean();
private final Object shutdownNowLock = new Object();
private volatile boolean terminateInitiated;
AsynchronousChannelGroupImpl(AsynchronousChannelProvider provider,
ThreadPool pool)
{
super(provider);
this.pool = pool;
if (pool.isFixedThreadPool()) {
taskQueue = new ConcurrentLinkedQueue<>();
} else {
taskQueue = null; // not used
}
// use default thread factory as thread should not be visible to
// application (it doesn't execute completion handlers).
this.timeoutExecutor = (ScheduledThreadPoolExecutor)
Executors.newScheduledThreadPool(1, ThreadPool.defaultThreadFactory());
this.timeoutExecutor.setRemoveOnCancelPolicy(true);
}
final ExecutorService executor() {
return pool.executor();
}
final boolean isFixedThreadPool() {
return pool.isFixedThreadPool();
}
final int fixedThreadCount() {
if (isFixedThreadPool()) {
return pool.poolSize();
} else {
return pool.poolSize() + internalThreadCount;
}
}
private Runnable bindToGroup(final Runnable task) {
final AsynchronousChannelGroupImpl thisGroup = this;
return new Runnable() {
public void run() {
Invoker.bindToGroup(thisGroup);
task.run();
}
};
}
private void startInternalThread(final Runnable task) {
AccessController.doPrivileged(new PrivilegedAction<>() {
@Override
public Void run() {
// internal threads should not be visible to application so
// cannot use user-supplied thread factory
ThreadPool.defaultThreadFactory().newThread(task).start();
return null;
}
});
}
protected final void startThreads(Runnable task) {
if (!isFixedThreadPool()) {
for (int i=0; i<internalThreadCount; i++) {
startInternalThread(task);
threadCount.incrementAndGet();
}
}
if (pool.poolSize() > 0) {
task = bindToGroup(task);
try {
for (int i=0; i<pool.poolSize(); i++) {
pool.executor().execute(task);
threadCount.incrementAndGet();
}
} catch (RejectedExecutionException x) {
// nothing we can do
}
}
}
final int threadCount() {
return threadCount.get();
}
/**
* Invoked by tasks as they terminate
*/
final int threadExit(Runnable task, boolean replaceMe) {
if (replaceMe) {
try {
if (Invoker.isBoundToAnyGroup()) {
// submit new task to replace this thread
pool.executor().execute(bindToGroup(task));
} else {
// replace internal thread
startInternalThread(task);
}
return threadCount.get();
} catch (RejectedExecutionException x) {
// unable to replace
}
}
return threadCount.decrementAndGet();
}
/**
* Wakes up a thread waiting for I/O events to execute the given task.
*/
abstract void executeOnHandlerTask(Runnable task);
/**
* For a fixed thread pool the task is queued to a thread waiting on I/O
* events. For other thread pools we simply submit the task to the thread
* pool.
*/
final void executeOnPooledThread(Runnable task) {
if (isFixedThreadPool()) {
executeOnHandlerTask(task);
} else {
pool.executor().execute(bindToGroup(task));
}
}
final void offerTask(Runnable task) {
taskQueue.offer(task);
}
final Runnable pollTask() {
return (taskQueue == null) ? null : taskQueue.poll();
}
final Future<?> schedule(Runnable task, long timeout, TimeUnit unit) {
try {
return timeoutExecutor.schedule(task, timeout, unit);
} catch (RejectedExecutionException rej) {
if (terminateInitiated) {
// no timeout scheduled as group is terminating
return null;
}
throw new AssertionError(rej);
}
}
@Override
public final boolean isShutdown() {
return shutdown.get();
}
@Override
public final boolean isTerminated() {
return pool.executor().isTerminated();
}
/**
* Returns true if there are no channels in the group
*/
abstract boolean isEmpty();
/**
* Attaches a foreign channel to this group.
*/
abstract Object attachForeignChannel(Channel channel, FileDescriptor fdo)
throws IOException;
/**
* Detaches a foreign channel from this group.
*/
abstract void detachForeignChannel(Object key);
/**
* Closes all channels in the group
*/
abstract void closeAllChannels() throws IOException;
/**
* Shutdown all tasks waiting for I/O events.
*/
abstract void shutdownHandlerTasks();
private void shutdownExecutors() {
AccessController.doPrivileged(
new PrivilegedAction<>() {
public Void run() {
pool.executor().shutdown();
timeoutExecutor.shutdown();
return null;
}
},
null,
new RuntimePermission("modifyThread"));
}
@Override
public final void shutdown() {
if (shutdown.getAndSet(true)) {
// already shutdown
return;
}
// if there are channels in the group then shutdown will continue
// when the last channel is closed
if (!isEmpty()) {
return;
}
// initiate termination (acquire shutdownNowLock to ensure that other
// threads invoking shutdownNow will block).
synchronized (shutdownNowLock) {
if (!terminateInitiated) {
terminateInitiated = true;
shutdownHandlerTasks();
shutdownExecutors();
}
}
}
@Override
public final void shutdownNow() throws IOException {
shutdown.set(true);
synchronized (shutdownNowLock) {
if (!terminateInitiated) {
terminateInitiated = true;
closeAllChannels();
shutdownHandlerTasks();
shutdownExecutors();
}
}
}
/**
* For use by AsynchronousFileChannel to release resources without shutting
* down the thread pool.
*/
final void detachFromThreadPool() {
if (shutdown.getAndSet(true))
throw new AssertionError("Already shutdown");
if (!isEmpty())
throw new AssertionError("Group not empty");
shutdownHandlerTasks();
}
@Override
public final boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException
{
return pool.executor().awaitTermination(timeout, unit);
}
/**
* Executes the given command on one of the channel group's pooled threads.
*/
@Override
public final void execute(Runnable task) {
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
// when a security manager is installed then the user's task
// must be run with the current calling context
final AccessControlContext acc = AccessController.getContext();
final Runnable delegate = task;
task = new Runnable() {
@Override
public void run() {
AccessController.doPrivileged(new PrivilegedAction<>() {
@Override
public Void run() {
delegate.run();
return null;
}
}, acc);
}
};
}
executeOnPooledThread(task);
}
}