jdk/src/windows/classes/sun/nio/ch/Iocp.java
changeset 2057 3acf8e5e2ca0
child 3632 399359a027de
equal deleted inserted replaced
2056:115e09b7a004 2057:3acf8e5e2ca0
       
     1 /*
       
     2  * Copyright 2008-2009 Sun Microsystems, Inc.  All Rights Reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Sun designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Sun in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
       
    22  * CA 95054 USA or visit www.sun.com if you need additional information or
       
    23  * have any questions.
       
    24  */
       
    25 
       
    26 package sun.nio.ch;
       
    27 
       
    28 import java.nio.channels.*;
       
    29 import java.nio.channels.spi.AsynchronousChannelProvider;
       
    30 import java.io.Closeable;
       
    31 import java.io.IOException;
       
    32 import java.io.FileDescriptor;
       
    33 import java.util.*;
       
    34 import java.util.concurrent.*;
       
    35 import java.util.concurrent.locks.ReadWriteLock;
       
    36 import java.util.concurrent.locks.ReentrantReadWriteLock;
       
    37 import sun.misc.Unsafe;
       
    38 
       
    39 /**
       
    40  * Windows implementation of AsynchronousChannelGroup encapsulating an I/O
       
    41  * completion port.
       
    42  */
       
    43 
       
    44 class Iocp extends AsynchronousChannelGroupImpl {
       
    45     private static final Unsafe unsafe = Unsafe.getUnsafe();
       
    46     private static final long INVALID_HANDLE_VALUE  = -1L;
       
    47 
       
    48     // maps completion key to channel
       
    49     private final ReadWriteLock keyToChannelLock = new ReentrantReadWriteLock();
       
    50     private final Map<Integer,OverlappedChannel> keyToChannel =
       
    51         new HashMap<Integer,OverlappedChannel>();
       
    52     private int nextCompletionKey;
       
    53 
       
    54     // handle to completion port
       
    55     private final long port;
       
    56 
       
    57     // true if port has been closed
       
    58     private boolean closed;
       
    59 
       
    60     // the set of "stale" OVERLAPPED structures. These OVERLAPPED structures
       
    61     // relate to I/O operations where the completion notification was not
       
    62     // received in a timely manner after the channel is closed.
       
    63     private final Set<Long> staleIoSet = new HashSet<Long>();
       
    64 
       
    65     Iocp(AsynchronousChannelProvider provider, ThreadPool pool)
       
    66         throws IOException
       
    67     {
       
    68         super(provider, pool);
       
    69         this.port =
       
    70           createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, fixedThreadCount());
       
    71         this.nextCompletionKey = 1;
       
    72     }
       
    73 
       
    74     Iocp start() {
       
    75         startThreads(new EventHandlerTask());
       
    76         return this;
       
    77     }
       
    78 
       
    79     /*
       
    80      * Channels implements this interface support overlapped I/O and can be
       
    81      * associated with a completion port.
       
    82      */
       
    83     static interface OverlappedChannel extends Closeable {
       
    84         /**
       
    85          * Returns a reference to the pending I/O result.
       
    86          */
       
    87         <V,A> PendingFuture<V,A> getByOverlapped(long overlapped);
       
    88     }
       
    89 
       
    90     // release all resources
       
    91     void implClose() {
       
    92         synchronized (this) {
       
    93             if (closed)
       
    94                 return;
       
    95             closed = true;
       
    96         }
       
    97         close0(port);
       
    98         synchronized (staleIoSet) {
       
    99             for (Long ov: staleIoSet) {
       
   100                 unsafe.freeMemory(ov);
       
   101             }
       
   102             staleIoSet.clear();
       
   103         }
       
   104     }
       
   105 
       
   106     @Override
       
   107     boolean isEmpty() {
       
   108         keyToChannelLock.writeLock().lock();
       
   109         try {
       
   110             return keyToChannel.isEmpty();
       
   111         } finally {
       
   112             keyToChannelLock.writeLock().unlock();
       
   113         }
       
   114     }
       
   115 
       
   116     @Override
       
   117     final Object attachForeignChannel(final Channel channel, FileDescriptor fdObj)
       
   118         throws IOException
       
   119     {
       
   120         int key = associate(new OverlappedChannel() {
       
   121             public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) {
       
   122                 return null;
       
   123             }
       
   124             public void close() throws IOException {
       
   125                 channel.close();
       
   126             }
       
   127         }, 0L);
       
   128         return Integer.valueOf(key);
       
   129     }
       
   130 
       
   131     @Override
       
   132     final void detachForeignChannel(Object key) {
       
   133         disassociate((Integer)key);
       
   134     }
       
   135 
       
   136     @Override
       
   137     void closeAllChannels() {
       
   138         /**
       
   139          * On Windows the close operation will close the socket/file handle
       
   140          * and then wait until all outstanding I/O operations have aborted.
       
   141          * This is necessary as each channel's cache of OVERLAPPED structures
       
   142          * can only be freed once all I/O operations have completed. As I/O
       
   143          * completion requires a lookup of the keyToChannel then we must close
       
   144          * the channels when not holding the write lock.
       
   145          */
       
   146         final int MAX_BATCH_SIZE = 32;
       
   147         OverlappedChannel channels[] = new OverlappedChannel[MAX_BATCH_SIZE];
       
   148         int count;
       
   149         do {
       
   150             // grab a batch of up to 32 channels
       
   151             keyToChannelLock.writeLock().lock();
       
   152             count = 0;
       
   153             try {
       
   154                 for (Integer key: keyToChannel.keySet()) {
       
   155                     channels[count++] = keyToChannel.get(key);
       
   156                     if (count >= MAX_BATCH_SIZE)
       
   157                         break;
       
   158                 }
       
   159             } finally {
       
   160                 keyToChannelLock.writeLock().unlock();
       
   161             }
       
   162 
       
   163             // close them
       
   164             for (int i=0; i<count; i++) {
       
   165                 try {
       
   166                     channels[i].close();
       
   167                 } catch (IOException ignore) { }
       
   168             }
       
   169         } while (count > 0);
       
   170     }
       
   171 
       
   172     private void wakeup() {
       
   173         try {
       
   174             postQueuedCompletionStatus(port, 0);
       
   175         } catch (IOException e) {
       
   176             // should not happen
       
   177             throw new AssertionError(e);
       
   178         }
       
   179     }
       
   180 
       
   181     @Override
       
   182     void executeOnHandlerTask(Runnable task) {
       
   183         synchronized (this) {
       
   184             if (closed)
       
   185                 throw new RejectedExecutionException();
       
   186             offerTask(task);
       
   187             wakeup();
       
   188         }
       
   189 
       
   190     }
       
   191 
       
   192     @Override
       
   193     void shutdownHandlerTasks() {
       
   194         // shutdown all handler threads
       
   195         int nThreads = threadCount();
       
   196         while (nThreads-- > 0) {
       
   197             wakeup();
       
   198         }
       
   199     }
       
   200 
       
   201     /**
       
   202      * Associate the given handle with this group
       
   203      */
       
   204     int associate(OverlappedChannel ch, long handle) throws IOException {
       
   205         keyToChannelLock.writeLock().lock();
       
   206 
       
   207         // generate a completion key (if not shutdown)
       
   208         int key;
       
   209         try {
       
   210             if (isShutdown())
       
   211                 throw new ShutdownChannelGroupException();
       
   212 
       
   213             // generate unique key
       
   214             do {
       
   215                 key = nextCompletionKey++;
       
   216             } while ((key == 0) || keyToChannel.containsKey(key));
       
   217 
       
   218             // associate with I/O completion port
       
   219             if (handle != 0L)
       
   220                 createIoCompletionPort(handle, port, key, 0);
       
   221 
       
   222             // setup mapping
       
   223             keyToChannel.put(key, ch);
       
   224         } finally {
       
   225             keyToChannelLock.writeLock().unlock();
       
   226         }
       
   227         return key;
       
   228     }
       
   229 
       
   230     /**
       
   231      * Disassociate channel from the group.
       
   232      */
       
   233     void disassociate(int key) {
       
   234         boolean checkForShutdown = false;
       
   235 
       
   236         keyToChannelLock.writeLock().lock();
       
   237         try {
       
   238             keyToChannel.remove(key);
       
   239 
       
   240             // last key to be removed so check if group is shutdown
       
   241             if (keyToChannel.isEmpty())
       
   242                 checkForShutdown = true;
       
   243 
       
   244         } finally {
       
   245             keyToChannelLock.writeLock().unlock();
       
   246         }
       
   247 
       
   248         // continue shutdown
       
   249         if (checkForShutdown && isShutdown()) {
       
   250             try {
       
   251                 shutdownNow();
       
   252             } catch (IOException ignore) { }
       
   253         }
       
   254     }
       
   255 
       
   256     /**
       
   257      * Invoked when a channel associated with this port is closed before
       
   258      * notifications for all outstanding I/O operations have been received.
       
   259      */
       
   260     void makeStale(Long overlapped) {
       
   261         synchronized (staleIoSet) {
       
   262             staleIoSet.add(overlapped);
       
   263         }
       
   264     }
       
   265 
       
   266     /**
       
   267      * Checks if the given OVERLAPPED is stale and if so, releases it.
       
   268      */
       
   269     private void checkIfStale(long ov) {
       
   270         synchronized (staleIoSet) {
       
   271             boolean removed = staleIoSet.remove(ov);
       
   272             if (removed) {
       
   273                 unsafe.freeMemory(ov);
       
   274             }
       
   275         }
       
   276     }
       
   277 
       
   278     /**
       
   279      * The handler for consuming the result of an asynchronous I/O operation.
       
   280      */
       
   281     static interface ResultHandler {
       
   282         /**
       
   283          * Invoked if the I/O operation completes successfully.
       
   284          */
       
   285         public void completed(int bytesTransferred);
       
   286 
       
   287         /**
       
   288          * Invoked if the I/O operation fails.
       
   289          */
       
   290         public void failed(int error, IOException ioe);
       
   291     }
       
   292 
       
   293     // Creates IOException for the given I/O error.
       
   294     private static IOException translateErrorToIOException(int error) {
       
   295         String msg = getErrorMessage(error);
       
   296         if (msg == null)
       
   297             msg = "Unknown error: 0x0" + Integer.toHexString(error);
       
   298         return new IOException(msg);
       
   299     }
       
   300 
       
   301     /**
       
   302      * Long-running task servicing system-wide or per-file completion port
       
   303      */
       
   304     private class EventHandlerTask implements Runnable {
       
   305         public void run() {
       
   306             Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
       
   307                 Invoker.getGroupAndInvokeCount();
       
   308             CompletionStatus ioResult = new CompletionStatus();
       
   309             boolean replaceMe = false;
       
   310 
       
   311             try {
       
   312                 for (;;) {
       
   313                     // reset invoke count
       
   314                     if (myGroupAndInvokeCount != null)
       
   315                         myGroupAndInvokeCount.resetInvokeCount();
       
   316 
       
   317                     // wait for I/O completion event
       
   318                     // A error here is fatal (thread will not be replaced)
       
   319                     replaceMe = false;
       
   320                     try {
       
   321                         getQueuedCompletionStatus(port, ioResult);
       
   322                     } catch (IOException x) {
       
   323                         // should not happen
       
   324                         x.printStackTrace();
       
   325                         return;
       
   326                     }
       
   327 
       
   328                     // handle wakeup to execute task or shutdown
       
   329                     if (ioResult.completionKey() == 0 &&
       
   330                         ioResult.overlapped() == 0L)
       
   331                     {
       
   332                         Runnable task = pollTask();
       
   333                         if (task == null) {
       
   334                             // shutdown request
       
   335                             return;
       
   336                         }
       
   337 
       
   338                         // run task
       
   339                         // (if error/exception then replace thread)
       
   340                         replaceMe = true;
       
   341                         task.run();
       
   342                         continue;
       
   343                     }
       
   344 
       
   345                     // map key to channel
       
   346                     OverlappedChannel ch = null;
       
   347                     keyToChannelLock.readLock().lock();
       
   348                     try {
       
   349                         ch = keyToChannel.get(ioResult.completionKey());
       
   350                         if (ch == null) {
       
   351                             checkIfStale(ioResult.overlapped());
       
   352                             continue;
       
   353                         }
       
   354                     } finally {
       
   355                         keyToChannelLock.readLock().unlock();
       
   356                     }
       
   357 
       
   358                     // lookup I/O request
       
   359                     PendingFuture<?,?> result = ch.getByOverlapped(ioResult.overlapped());
       
   360                     if (result == null) {
       
   361                         // we get here if the OVERLAPPED structure is associated
       
   362                         // with an I/O operation on a channel that was closed
       
   363                         // but the I/O operation event wasn't read in a timely
       
   364                         // manner. Alternatively, it may be related to a
       
   365                         // tryLock operation as the OVERLAPPED structures for
       
   366                         // these operations are not in the I/O cache.
       
   367                         checkIfStale(ioResult.overlapped());
       
   368                         continue;
       
   369                     }
       
   370 
       
   371                     // synchronize on result in case I/O completed immediately
       
   372                     // and was handled by initiator
       
   373                     synchronized (result) {
       
   374                         if (result.isDone()) {
       
   375                             continue;
       
   376                         }
       
   377                         // not handled by initiator
       
   378                     }
       
   379 
       
   380                     // invoke I/O result handler
       
   381                     int error = ioResult.error();
       
   382                     ResultHandler rh = (ResultHandler)result.getContext();
       
   383                     replaceMe = true; // (if error/exception then replace thread)
       
   384                     if (error == 0) {
       
   385                         rh.completed(ioResult.bytesTransferred());
       
   386                     } else {
       
   387                         rh.failed(error, translateErrorToIOException(error));
       
   388                     }
       
   389                 }
       
   390             } finally {
       
   391                 // last thread to exit when shutdown releases resources
       
   392                 int remaining = threadExit(this, replaceMe);
       
   393                 if (remaining == 0 && isShutdown()) {
       
   394                     implClose();
       
   395                 }
       
   396             }
       
   397         }
       
   398     }
       
   399 
       
   400     /**
       
   401      * Container for data returned by GetQueuedCompletionStatus
       
   402      */
       
   403     private static class CompletionStatus {
       
   404         private int error;
       
   405         private int bytesTransferred;
       
   406         private int completionKey;
       
   407         private long overlapped;
       
   408 
       
   409         private CompletionStatus() { }
       
   410         int error() { return error; }
       
   411         int bytesTransferred() { return bytesTransferred; }
       
   412         int completionKey() { return completionKey; }
       
   413         long overlapped() { return overlapped; }
       
   414     }
       
   415 
       
   416     // -- native methods --
       
   417 
       
   418     private static native void initIDs();
       
   419 
       
   420     private static native long createIoCompletionPort(long handle,
       
   421         long existingPort, int completionKey, int concurrency) throws IOException;
       
   422 
       
   423     private static native void close0(long handle);
       
   424 
       
   425     private static native void getQueuedCompletionStatus(long completionPort,
       
   426         CompletionStatus status) throws IOException;
       
   427 
       
   428     private static native void postQueuedCompletionStatus(long completionPort,
       
   429         int completionKey) throws IOException;
       
   430 
       
   431     private static native String getErrorMessage(int error);
       
   432 
       
   433     static {
       
   434         Util.load();
       
   435         initIDs();
       
   436     }
       
   437 }