jdk/src/share/classes/sun/nio/ch/AsynchronousChannelGroupImpl.java
changeset 2057 3acf8e5e2ca0
child 3632 399359a027de
equal deleted inserted replaced
2056:115e09b7a004 2057:3acf8e5e2ca0
       
     1 /*
       
     2  * Copyright 2008-2009 Sun Microsystems, Inc.  All Rights Reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Sun designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Sun in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
       
    22  * CA 95054 USA or visit www.sun.com if you need additional information or
       
    23  * have any questions.
       
    24  */
       
    25 
       
    26 package sun.nio.ch;
       
    27 
       
    28 import java.nio.channels.Channel;
       
    29 import java.nio.channels.AsynchronousChannelGroup;
       
    30 import java.nio.channels.spi.AsynchronousChannelProvider;
       
    31 import java.io.IOException;
       
    32 import java.io.FileDescriptor;
       
    33 import java.util.Queue;
       
    34 import java.util.concurrent.*;
       
    35 import java.util.concurrent.locks.*;
       
    36 import java.util.concurrent.atomic.AtomicInteger;
       
    37 import java.security.PrivilegedAction;
       
    38 import java.security.AccessController;
       
    39 import java.security.AccessControlContext;
       
    40 import sun.security.action.GetIntegerAction;
       
    41 
       
    42 /**
       
    43  * Base implementation of AsynchronousChannelGroup
       
    44  */
       
    45 
       
    46 abstract class AsynchronousChannelGroupImpl
       
    47     extends AsynchronousChannelGroup implements Executor
       
    48 {
       
    49     // number of internal threads handling I/O events when using an unbounded
       
    50     // thread pool. Internal threads do not dispatch to completion handlers.
       
    51     private static final int internalThreadCount = AccessController.doPrivileged(
       
    52         new GetIntegerAction("sun.nio.ch.internalThreadPoolSize", 1));
       
    53 
       
    54     // associated thread pool
       
    55     private final ThreadPool pool;
       
    56 
       
    57     // number of tasks running (including internal)
       
    58     private final AtomicInteger threadCount = new AtomicInteger();
       
    59 
       
    60     // associated Executor for timeouts
       
    61     private ScheduledThreadPoolExecutor timeoutExecutor;
       
    62 
       
    63     // task queue for when using a fixed thread pool. In that case, thread
       
    64     // waiting on I/O events must be awokon to poll tasks from this queue.
       
    65     private final Queue<Runnable> taskQueue;
       
    66 
       
    67     // group shutdown
       
    68     // shutdownLock is RW lock so as to allow for concurrent queuing of tasks
       
    69     // when using a fixed thread pool.
       
    70     private final ReadWriteLock shutdownLock = new ReentrantReadWriteLock();
       
    71     private final Object shutdownNowLock = new Object();
       
    72     private volatile boolean shutdown;
       
    73     private volatile boolean terminateInitiated;
       
    74 
       
    75     AsynchronousChannelGroupImpl(AsynchronousChannelProvider provider,
       
    76                                  ThreadPool pool)
       
    77     {
       
    78         super(provider);
       
    79         this.pool = pool;
       
    80 
       
    81         if (pool.isFixedThreadPool()) {
       
    82             taskQueue = new ConcurrentLinkedQueue<Runnable>();
       
    83         } else {
       
    84             taskQueue = null;   // not used
       
    85         }
       
    86 
       
    87         // use default thread factory as thread should not be visible to
       
    88         // application (it doesn't execute completion handlers).
       
    89         this.timeoutExecutor = (ScheduledThreadPoolExecutor)
       
    90             Executors.newScheduledThreadPool(1, ThreadPool.defaultThreadFactory());
       
    91         this.timeoutExecutor.setRemoveOnCancelPolicy(true);
       
    92     }
       
    93 
       
    94     final ExecutorService executor() {
       
    95         return pool.executor();
       
    96     }
       
    97 
       
    98     final boolean isFixedThreadPool() {
       
    99         return pool.isFixedThreadPool();
       
   100     }
       
   101 
       
   102     final int fixedThreadCount() {
       
   103         if (isFixedThreadPool()) {
       
   104             return pool.poolSize();
       
   105         } else {
       
   106             return pool.poolSize() + internalThreadCount;
       
   107         }
       
   108     }
       
   109 
       
   110     private Runnable bindToGroup(final Runnable task) {
       
   111         final AsynchronousChannelGroupImpl thisGroup = this;
       
   112         return new Runnable() {
       
   113             public void run() {
       
   114                 Invoker.bindToGroup(thisGroup);
       
   115                 task.run();
       
   116             }
       
   117         };
       
   118     }
       
   119 
       
   120     private void startInternalThread(final Runnable task) {
       
   121         AccessController.doPrivileged(new PrivilegedAction<Void>() {
       
   122             @Override
       
   123             public Void run() {
       
   124                 // internal threads should not be visible to application so
       
   125                 // cannot use user-supplied thread factory
       
   126                 ThreadPool.defaultThreadFactory().newThread(task).start();
       
   127                 return null;
       
   128             }
       
   129          });
       
   130     }
       
   131 
       
   132     protected final void startThreads(Runnable task) {
       
   133         if (!isFixedThreadPool()) {
       
   134             for (int i=0; i<internalThreadCount; i++) {
       
   135                 startInternalThread(task);
       
   136                 threadCount.incrementAndGet();
       
   137             }
       
   138         }
       
   139         if (pool.poolSize() > 0) {
       
   140             task = bindToGroup(task);
       
   141             try {
       
   142                 for (int i=0; i<pool.poolSize(); i++) {
       
   143                     pool.executor().execute(task);
       
   144                     threadCount.incrementAndGet();
       
   145                 }
       
   146             } catch (RejectedExecutionException  x) {
       
   147                 // nothing we can do
       
   148             }
       
   149         }
       
   150     }
       
   151 
       
   152     final int threadCount() {
       
   153         return threadCount.get();
       
   154     }
       
   155 
       
   156     /**
       
   157      * Invoked by tasks as they terminate
       
   158      */
       
   159     final int threadExit(Runnable task, boolean replaceMe) {
       
   160         if (replaceMe) {
       
   161             try {
       
   162                 if (Invoker.isBoundToAnyGroup()) {
       
   163                     // submit new task to replace this thread
       
   164                     pool.executor().execute(bindToGroup(task));
       
   165                 } else {
       
   166                     // replace internal thread
       
   167                     startInternalThread(task);
       
   168                 }
       
   169                 return threadCount.get();
       
   170             } catch (RejectedExecutionException x) {
       
   171                 // unable to replace
       
   172             }
       
   173         }
       
   174         return threadCount.decrementAndGet();
       
   175     }
       
   176 
       
   177     /**
       
   178      * Wakes up a thread waiting for I/O events to execute the given task.
       
   179      */
       
   180     abstract void executeOnHandlerTask(Runnable task);
       
   181 
       
   182     /**
       
   183      * For a fixed thread pool the task is queued to a thread waiting on I/O
       
   184      * events. For other thread pools we simply submit the task to the thread
       
   185      * pool.
       
   186      */
       
   187     final void executeOnPooledThread(Runnable task) {
       
   188         if (isFixedThreadPool()) {
       
   189             executeOnHandlerTask(task);
       
   190         } else {
       
   191             pool.executor().execute(bindToGroup(task));
       
   192         }
       
   193     }
       
   194 
       
   195     final void offerTask(Runnable task) {
       
   196         taskQueue.offer(task);
       
   197     }
       
   198 
       
   199     final Runnable pollTask() {
       
   200         return (taskQueue == null) ? null : taskQueue.poll();
       
   201     }
       
   202 
       
   203     final Future<?> schedule(Runnable task, long timeout, TimeUnit unit) {
       
   204         try {
       
   205             return timeoutExecutor.schedule(task, timeout, unit);
       
   206         } catch (RejectedExecutionException rej) {
       
   207             if (terminateInitiated) {
       
   208                 // no timeout scheduled as group is terminating
       
   209                 return null;
       
   210             }
       
   211             throw new AssertionError(rej);
       
   212         }
       
   213     }
       
   214 
       
   215     @Override
       
   216     public final boolean isShutdown() {
       
   217         return shutdown;
       
   218     }
       
   219 
       
   220     @Override
       
   221     public final boolean isTerminated()  {
       
   222         return pool.executor().isTerminated();
       
   223     }
       
   224 
       
   225     /**
       
   226      * Returns true if there are no channels in the group
       
   227      */
       
   228     abstract boolean isEmpty();
       
   229 
       
   230     /**
       
   231      * Attaches a foreign channel to this group.
       
   232      */
       
   233     abstract Object attachForeignChannel(Channel channel, FileDescriptor fdo)
       
   234         throws IOException;
       
   235 
       
   236     /**
       
   237      * Detaches a foreign channel from this group.
       
   238      */
       
   239     abstract void detachForeignChannel(Object key);
       
   240 
       
   241     /**
       
   242      * Closes all channels in the group
       
   243      */
       
   244     abstract void closeAllChannels() throws IOException;
       
   245 
       
   246     /**
       
   247      * Shutdown all tasks waiting for I/O events.
       
   248      */
       
   249     abstract void shutdownHandlerTasks();
       
   250 
       
   251     private void shutdownExecutors() {
       
   252         AccessController.doPrivileged(new PrivilegedAction<Void>() {
       
   253             public Void run() {
       
   254                 pool.executor().shutdown();
       
   255                 timeoutExecutor.shutdown();
       
   256                 return null;
       
   257             }
       
   258         });
       
   259     }
       
   260 
       
   261     @Override
       
   262     public final void shutdown() {
       
   263         shutdownLock.writeLock().lock();
       
   264         try {
       
   265             if (shutdown) {
       
   266                 // already shutdown
       
   267                 return;
       
   268             }
       
   269             shutdown = true;
       
   270         } finally {
       
   271             shutdownLock.writeLock().unlock();
       
   272         }
       
   273 
       
   274         // if there are channels in the group then shutdown will continue
       
   275         // when the last channel is closed
       
   276         if (!isEmpty()) {
       
   277             return;
       
   278         }
       
   279         // initiate termination (acquire shutdownNowLock to ensure that other
       
   280         // threads invoking shutdownNow will block).
       
   281         synchronized (shutdownNowLock) {
       
   282             if (!terminateInitiated) {
       
   283                 terminateInitiated = true;
       
   284                 shutdownHandlerTasks();
       
   285                 shutdownExecutors();
       
   286             }
       
   287         }
       
   288     }
       
   289 
       
   290     @Override
       
   291     public final void shutdownNow() throws IOException {
       
   292         shutdownLock.writeLock().lock();
       
   293         try {
       
   294             shutdown = true;
       
   295         } finally {
       
   296             shutdownLock.writeLock().unlock();
       
   297         }
       
   298         synchronized (shutdownNowLock) {
       
   299             if (!terminateInitiated) {
       
   300                 terminateInitiated = true;
       
   301                 closeAllChannels();
       
   302                 shutdownHandlerTasks();
       
   303                 shutdownExecutors();
       
   304             }
       
   305         }
       
   306     }
       
   307 
       
   308     @Override
       
   309     public final boolean awaitTermination(long timeout, TimeUnit unit)
       
   310         throws InterruptedException
       
   311     {
       
   312         return pool.executor().awaitTermination(timeout, unit);
       
   313     }
       
   314 
       
   315     /**
       
   316      * Executes the given command on one of the channel group's pooled threads.
       
   317      */
       
   318     @Override
       
   319     public final void execute(Runnable task) {
       
   320         SecurityManager sm = System.getSecurityManager();
       
   321         if (sm != null) {
       
   322             // when a security manager is installed then the user's task
       
   323             // must be run with the current calling context
       
   324             final AccessControlContext acc = AccessController.getContext();
       
   325             final Runnable delegate = task;
       
   326             task = new Runnable() {
       
   327                 @Override
       
   328                 public void run() {
       
   329                     AccessController.doPrivileged(new PrivilegedAction<Void>() {
       
   330                         @Override
       
   331                         public Void run() {
       
   332                             delegate.run();
       
   333                             return null;
       
   334                         }
       
   335                     }, acc);
       
   336                 }
       
   337             };
       
   338         }
       
   339         executeOnPooledThread(task);
       
   340     }
       
   341 }