jdk/src/windows/classes/sun/nio/ch/WindowsAsynchronousServerSocketChannelImpl.java
changeset 2057 3acf8e5e2ca0
child 3632 399359a027de
equal deleted inserted replaced
2056:115e09b7a004 2057:3acf8e5e2ca0
       
     1 /*
       
     2  * Copyright 2008-2009 Sun Microsystems, Inc.  All Rights Reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Sun designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Sun in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
       
    22  * CA 95054 USA or visit www.sun.com if you need additional information or
       
    23  * have any questions.
       
    24  */
       
    25 
       
    26 package sun.nio.ch;
       
    27 
       
    28 import java.nio.channels.*;
       
    29 import java.net.InetSocketAddress;
       
    30 import java.util.concurrent.Future;
       
    31 import java.util.concurrent.atomic.AtomicBoolean;
       
    32 import java.io.IOException;
       
    33 import java.security.AccessControlContext;
       
    34 import java.security.AccessController;
       
    35 import java.security.PrivilegedAction;
       
    36 import sun.misc.Unsafe;
       
    37 
       
    38 /**
       
    39  * Windows implementation of AsynchronousServerSocketChannel using overlapped I/O.
       
    40  */
       
    41 
       
    42 class WindowsAsynchronousServerSocketChannelImpl
       
    43     extends AsynchronousServerSocketChannelImpl implements Iocp.OverlappedChannel
       
    44 {
       
    45     private static final Unsafe unsafe = Unsafe.getUnsafe();
       
    46 
       
    47     // 2 * (sizeof(SOCKET_ADDRESS) + 16)
       
    48     private static final int DATA_BUFFER_SIZE = 88;
       
    49 
       
    50     private final long handle;
       
    51     private final int completionKey;
       
    52     private final Iocp iocp;
       
    53 
       
    54     // typically there will be zero, or one I/O operations pending. In rare
       
    55     // cases there may be more. These rare cases arise when a sequence of accept
       
    56     // operations complete immediately and handled by the initiating thread.
       
    57     // The corresponding OVERLAPPED cannot be reused/released until the completion
       
    58     // event has been posted.
       
    59     private final PendingIoCache ioCache;
       
    60 
       
    61     // the data buffer to receive the local/remote socket address
       
    62     private final long dataBuffer;
       
    63 
       
    64     // flag to indicate that an accept operation is outstanding
       
    65     private AtomicBoolean accepting = new AtomicBoolean();
       
    66 
       
    67 
       
    68     WindowsAsynchronousServerSocketChannelImpl(Iocp iocp) throws IOException {
       
    69         super(iocp);
       
    70 
       
    71         // associate socket with given completion port
       
    72         long h = IOUtil.fdVal(fd);
       
    73         int key;
       
    74         try {
       
    75             key = iocp.associate(this, h);
       
    76         } catch (IOException x) {
       
    77             closesocket0(h);   // prevent leak
       
    78             throw x;
       
    79         }
       
    80 
       
    81         this.handle = h;
       
    82         this.completionKey = key;
       
    83         this.iocp = iocp;
       
    84         this.ioCache = new PendingIoCache();
       
    85         this.dataBuffer = unsafe.allocateMemory(DATA_BUFFER_SIZE);
       
    86     }
       
    87 
       
    88     @Override
       
    89     public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) {
       
    90         return ioCache.remove(overlapped);
       
    91     }
       
    92 
       
    93     @Override
       
    94     void implClose() throws IOException {
       
    95         // close socket (which may cause outstanding accept to be aborted).
       
    96         closesocket0(handle);
       
    97 
       
    98         // waits until the accept operations have completed
       
    99         ioCache.close();
       
   100 
       
   101         // finally disassociate from the completion port
       
   102         iocp.disassociate(completionKey);
       
   103 
       
   104         // release other resources
       
   105         unsafe.freeMemory(dataBuffer);
       
   106     }
       
   107 
       
   108     @Override
       
   109     public AsynchronousChannelGroupImpl group() {
       
   110         return iocp;
       
   111     }
       
   112 
       
   113     /**
       
   114      * Task to initiate accept operation and to handle result.
       
   115      */
       
   116     private class AcceptTask<A> implements Runnable, Iocp.ResultHandler {
       
   117         private final WindowsAsynchronousSocketChannelImpl channel;
       
   118         private final AccessControlContext acc;
       
   119         private final PendingFuture<AsynchronousSocketChannel,A> result;
       
   120 
       
   121         AcceptTask(WindowsAsynchronousSocketChannelImpl channel,
       
   122                    AccessControlContext acc,
       
   123                    PendingFuture<AsynchronousSocketChannel,A> result)
       
   124         {
       
   125             this.channel = channel;
       
   126             this.acc = acc;
       
   127             this.result = result;
       
   128         }
       
   129 
       
   130         void enableAccept() {
       
   131             accepting.set(false);
       
   132         }
       
   133 
       
   134         void closeChildChannel() {
       
   135             try {
       
   136                 channel.close();
       
   137             } catch (IOException ignore) { }
       
   138         }
       
   139 
       
   140         // caller must have acquired read lock for the listener and child channel.
       
   141         void finishAccept() throws IOException {
       
   142             /**
       
   143              * Set local/remote addresses. This is currently very inefficient
       
   144              * in that it requires 2 calls to getsockname and 2 calls to getpeername.
       
   145              * (should change this to use GetAcceptExSockaddrs)
       
   146              */
       
   147             updateAcceptContext(handle, channel.handle());
       
   148 
       
   149             InetSocketAddress local = Net.localAddress(channel.fd);
       
   150             final InetSocketAddress remote = Net.remoteAddress(channel.fd);
       
   151             channel.setConnected(local, remote);
       
   152 
       
   153             // permission check (in context of initiating thread)
       
   154             if (acc != null) {
       
   155                 AccessController.doPrivileged(new PrivilegedAction<Void>() {
       
   156                     public Void run() {
       
   157                         SecurityManager sm = System.getSecurityManager();
       
   158                         sm.checkAccept(remote.getAddress().getHostAddress(),
       
   159                                        remote.getPort());
       
   160                         return null;
       
   161                     }
       
   162                 }, acc);
       
   163             }
       
   164         }
       
   165 
       
   166         /**
       
   167          * Initiates the accept operation.
       
   168          */
       
   169         @Override
       
   170         public void run() {
       
   171             long overlapped = 0L;
       
   172 
       
   173             try {
       
   174                 // begin usage of listener socket
       
   175                 begin();
       
   176                 try {
       
   177                     // begin usage of child socket (as it is registered with
       
   178                     // completion port and so may be closed in the event that
       
   179                     // the group is forcefully closed).
       
   180                     channel.begin();
       
   181 
       
   182                     synchronized (result) {
       
   183                         overlapped = ioCache.add(result);
       
   184 
       
   185                         int n = accept0(handle, channel.handle(), overlapped, dataBuffer);
       
   186                         if (n == IOStatus.UNAVAILABLE) {
       
   187                             return;
       
   188                         }
       
   189 
       
   190                         // connection accepted immediately
       
   191                         finishAccept();
       
   192 
       
   193                         // allow another accept before the result is set
       
   194                         enableAccept();
       
   195                         result.setResult(channel);
       
   196                     }
       
   197                 } finally {
       
   198                     // end usage on child socket
       
   199                     channel.end();
       
   200                 }
       
   201             } catch (Throwable x) {
       
   202                 // failed to initiate accept so release resources
       
   203                 if (overlapped != 0L)
       
   204                     ioCache.remove(overlapped);
       
   205                 closeChildChannel();
       
   206                 if (x instanceof ClosedChannelException)
       
   207                     x = new AsynchronousCloseException();
       
   208                 if (!(x instanceof IOException) && !(x instanceof SecurityException))
       
   209                     x = new IOException(x);
       
   210                 enableAccept();
       
   211                 result.setFailure(x);
       
   212             } finally {
       
   213                 // end of usage of listener socket
       
   214                 end();
       
   215             }
       
   216 
       
   217             // accept completed immediately but may not have executed on
       
   218             // initiating thread in which case the operation may have been
       
   219             // cancelled.
       
   220             if (result.isCancelled()) {
       
   221                 closeChildChannel();
       
   222             }
       
   223 
       
   224             // invoke completion handler
       
   225             Invoker.invokeIndirectly(result.handler(), result);
       
   226         }
       
   227 
       
   228         /**
       
   229          * Executed when the I/O has completed
       
   230          */
       
   231         @Override
       
   232         public void completed(int bytesTransferred) {
       
   233             try {
       
   234                 // connection accept after group has shutdown
       
   235                 if (iocp.isShutdown()) {
       
   236                     throw new IOException(new ShutdownChannelGroupException());
       
   237                 }
       
   238 
       
   239                 // finish the accept
       
   240                 try {
       
   241                     begin();
       
   242                     try {
       
   243                         channel.begin();
       
   244                         finishAccept();
       
   245                     } finally {
       
   246                         channel.end();
       
   247                     }
       
   248                 } finally {
       
   249                     end();
       
   250                 }
       
   251 
       
   252                 // allow another accept before the result is set
       
   253                 enableAccept();
       
   254                 result.setResult(channel);
       
   255             } catch (Throwable x) {
       
   256                 enableAccept();
       
   257                 closeChildChannel();
       
   258                 if (x instanceof ClosedChannelException)
       
   259                     x = new AsynchronousCloseException();
       
   260                 if (!(x instanceof IOException) && !(x instanceof SecurityException))
       
   261                     x = new IOException(x);
       
   262                 result.setFailure(x);
       
   263             }
       
   264 
       
   265             // if an async cancel has already cancelled the operation then
       
   266             // close the new channel so as to free resources
       
   267             if (result.isCancelled()) {
       
   268                 closeChildChannel();
       
   269             }
       
   270 
       
   271             // invoke handler (but not directly)
       
   272             Invoker.invokeIndirectly(result.handler(), result);
       
   273         }
       
   274 
       
   275         @Override
       
   276         public void failed(int error, IOException x) {
       
   277             enableAccept();
       
   278             closeChildChannel();
       
   279 
       
   280             // release waiters
       
   281             if (isOpen()) {
       
   282                 result.setFailure(x);
       
   283             } else {
       
   284                 result.setFailure(new AsynchronousCloseException());
       
   285             }
       
   286             Invoker.invokeIndirectly(result.handler(), result);
       
   287         }
       
   288     }
       
   289 
       
   290     @Override
       
   291     public <A> Future<AsynchronousSocketChannel> accept(A attachment,
       
   292         final CompletionHandler<AsynchronousSocketChannel,? super A> handler)
       
   293     {
       
   294         if (!isOpen()) {
       
   295             CompletedFuture<AsynchronousSocketChannel,A> result = CompletedFuture
       
   296                 .withFailure(this, new ClosedChannelException(), attachment);
       
   297             Invoker.invokeIndirectly(handler, result);
       
   298             return result;
       
   299         }
       
   300         if (isAcceptKilled())
       
   301             throw new RuntimeException("Accept not allowed due to cancellation");
       
   302 
       
   303         // ensure channel is bound to local address
       
   304         if (localAddress == null)
       
   305             throw new NotYetBoundException();
       
   306 
       
   307         // create the socket that will be accepted. The creation of the socket
       
   308         // is enclosed by a begin/end for the listener socket to ensure that
       
   309         // we check that the listener is open and also to prevent the I/O
       
   310         // port from being closed as the new socket is registered.
       
   311         WindowsAsynchronousSocketChannelImpl ch = null;
       
   312         IOException ioe = null;
       
   313         try {
       
   314             begin();
       
   315             ch = new WindowsAsynchronousSocketChannelImpl(iocp, false);
       
   316         } catch (IOException x) {
       
   317             ioe = x;
       
   318         } finally {
       
   319             end();
       
   320         }
       
   321         if (ioe != null) {
       
   322             CompletedFuture<AsynchronousSocketChannel,A> result =
       
   323                 CompletedFuture.withFailure(this, ioe, attachment);
       
   324             Invoker.invokeIndirectly(handler, result);
       
   325             return result;
       
   326         }
       
   327 
       
   328         // need calling context when there is security manager as
       
   329         // permission check may be done in a different thread without
       
   330         // any application call frames on the stack
       
   331         AccessControlContext acc = (System.getSecurityManager() == null) ?
       
   332             null : AccessController.getContext();
       
   333 
       
   334         PendingFuture<AsynchronousSocketChannel,A> result =
       
   335             new PendingFuture<AsynchronousSocketChannel,A>(this, handler, attachment);
       
   336         AcceptTask task = new AcceptTask<A>(ch, acc, result);
       
   337         result.setContext(task);
       
   338 
       
   339         // check and set flag to prevent concurrent accepting
       
   340         if (!accepting.compareAndSet(false, true))
       
   341             throw new AcceptPendingException();
       
   342 
       
   343         // initiate accept. As I/O operations are tied to the initiating thread
       
   344         // then it will only be invoked direcly if this thread is in the thread
       
   345         // pool. If this thread is not in the thread pool when a task is
       
   346         // submitted to initiate the accept.
       
   347         Invoker.invokeOnThreadInThreadPool(this, task);
       
   348         return result;
       
   349     }
       
   350 
       
   351     // -- Native methods --
       
   352 
       
   353     private static native void initIDs();
       
   354 
       
   355     private static native int accept0(long listenSocket, long acceptSocket,
       
   356         long overlapped, long dataBuffer) throws IOException;
       
   357 
       
   358     private static native void updateAcceptContext(long listenSocket,
       
   359         long acceptSocket) throws IOException;
       
   360 
       
   361     private static native void closesocket0(long socket) throws IOException;
       
   362 
       
   363     static {
       
   364         Util.load();
       
   365         initIDs();
       
   366     }
       
   367 }