jdk/src/solaris/classes/sun/nio/ch/SctpChannelImpl.java
changeset 2542 d859108aea12
child 3072 a801b122142f
equal deleted inserted replaced
2418:15096652c4d4 2542:d859108aea12
       
     1 /*
       
     2  * Copyright 2009 Sun Microsystems, Inc.  All Rights Reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Sun designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Sun in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
       
    22  * CA 95054 USA or visit www.sun.com if you need additional information or
       
    23  * have any questions.
       
    24  */
       
    25 package sun.nio.ch;
       
    26 
       
    27 import java.net.InetAddress;
       
    28 import java.net.SocketAddress;
       
    29 import java.net.InetSocketAddress;
       
    30 import java.io.FileDescriptor;
       
    31 import java.io.IOException;
       
    32 import java.util.Collections;
       
    33 import java.util.Set;
       
    34 import java.util.HashSet;
       
    35 import java.nio.ByteBuffer;
       
    36 import java.nio.channels.SelectionKey;
       
    37 import java.nio.channels.ClosedChannelException;
       
    38 import java.nio.channels.ConnectionPendingException;
       
    39 import java.nio.channels.NoConnectionPendingException;
       
    40 import java.nio.channels.AlreadyBoundException;
       
    41 import java.nio.channels.AlreadyConnectedException;
       
    42 import java.nio.channels.NotYetBoundException;
       
    43 import java.nio.channels.NotYetConnectedException;
       
    44 import java.nio.channels.spi.SelectorProvider;
       
    45 import com.sun.nio.sctp.AbstractNotificationHandler;
       
    46 import com.sun.nio.sctp.Association;
       
    47 import com.sun.nio.sctp.AssociationChangeNotification;
       
    48 import com.sun.nio.sctp.HandlerResult;
       
    49 import com.sun.nio.sctp.IllegalReceiveException;
       
    50 import com.sun.nio.sctp.InvalidStreamException;
       
    51 import com.sun.nio.sctp.IllegalUnbindException;
       
    52 import com.sun.nio.sctp.MessageInfo;
       
    53 import com.sun.nio.sctp.NotificationHandler;
       
    54 import com.sun.nio.sctp.SctpChannel;
       
    55 import com.sun.nio.sctp.SctpSocketOption;
       
    56 import sun.nio.ch.NativeDispatcher;
       
    57 import sun.nio.ch.PollArrayWrapper;
       
    58 import sun.nio.ch.SelChImpl;
       
    59 import static com.sun.nio.sctp.SctpStandardSocketOption.*;
       
    60 import static sun.nio.ch.SctpResultContainer.SEND_FAILED;
       
    61 import static sun.nio.ch.SctpResultContainer.ASSOCIATION_CHANGED;
       
    62 import static sun.nio.ch.SctpResultContainer.PEER_ADDRESS_CHANGED;
       
    63 import static sun.nio.ch.SctpResultContainer.SHUTDOWN;
       
    64 
       
    65 /**
       
    66  * An implementation of an SctpChannel
       
    67  */
       
    68 public class SctpChannelImpl extends SctpChannel
       
    69     implements SelChImpl
       
    70 {
       
    71     /* Used to make native close and preClose calls */
       
    72     private static NativeDispatcher nd;
       
    73 
       
    74     private final FileDescriptor fd;
       
    75 
       
    76     private final int fdVal;
       
    77 
       
    78     /* IDs of native threads doing send and receivess, for signalling */
       
    79     private volatile long receiverThread = 0;
       
    80     private volatile long senderThread = 0;
       
    81 
       
    82     /* Lock held by current receiving or connecting thread */
       
    83     private final Object receiveLock = new Object();
       
    84 
       
    85     /* Lock held by current sending or connecting thread */
       
    86     private final Object sendLock = new Object();
       
    87 
       
    88     private final ThreadLocal<Boolean> receiveInvoked =
       
    89         new ThreadLocal<Boolean>() {
       
    90              @Override protected Boolean initialValue() {
       
    91                  return Boolean.FALSE;
       
    92             }
       
    93     };
       
    94 
       
    95     /* Lock held by any thread that modifies the state fields declared below
       
    96        DO NOT invoke a blocking I/O operation while holding this lock! */
       
    97     private final Object stateLock = new Object();
       
    98 
       
    99     private enum ChannelState {
       
   100         UNINITIALIZED,
       
   101         UNCONNECTED,
       
   102         PENDING,
       
   103         CONNECTED,
       
   104         KILLPENDING,
       
   105         KILLED,
       
   106     }
       
   107     /* -- The following fields are protected by stateLock -- */
       
   108     private ChannelState state = ChannelState.UNINITIALIZED;
       
   109 
       
   110     /* Binding; Once bound the port will remain constant. */
       
   111     int port = -1;
       
   112     private HashSet<InetSocketAddress> localAddresses = new HashSet<InetSocketAddress>();
       
   113     /* Has the channel been bound to the wildcard address */
       
   114     private boolean wildcard; /* false */
       
   115     //private InetSocketAddress remoteAddress = null;
       
   116 
       
   117     /* Input/Output open */
       
   118     private boolean readyToConnect;
       
   119 
       
   120     /* Shutdown */
       
   121     private boolean isShutdown;
       
   122 
       
   123     private Association association;
       
   124 
       
   125     /* -- End of fields protected by stateLock -- */
       
   126 
       
   127     private SctpResultContainer commUpResultContainer;  /* null */
       
   128 
       
   129     /**
       
   130      * Constructor for normal connecting sockets
       
   131      */
       
   132     public SctpChannelImpl(SelectorProvider provider) throws IOException {
       
   133         //TODO: update provider remove public modifier
       
   134         super(provider);
       
   135         this.fd = SctpNet.socket(true);
       
   136         this.fdVal = IOUtil.fdVal(fd);
       
   137         this.state = ChannelState.UNCONNECTED;
       
   138     }
       
   139 
       
   140     /**
       
   141      * Constructor for sockets obtained from server sockets
       
   142      */
       
   143     public SctpChannelImpl(SelectorProvider provider, FileDescriptor fd)
       
   144          throws IOException {
       
   145         super(provider);
       
   146         this.fd = fd;
       
   147         this.fdVal = IOUtil.fdVal(fd);
       
   148         this.state = ChannelState.CONNECTED;
       
   149         port = (Net.localAddress(fd)).getPort();
       
   150 
       
   151         /* Receive COMM_UP */
       
   152         ByteBuffer buf = Util.getTemporaryDirectBuffer(50);
       
   153         try {
       
   154             receive(buf, null, null, true);
       
   155         } finally {
       
   156             Util.releaseTemporaryDirectBuffer(buf);
       
   157         }
       
   158     }
       
   159 
       
   160     /**
       
   161      * Binds the channel's socket to a local address.
       
   162      */
       
   163     @Override
       
   164     public SctpChannel bind(SocketAddress local) throws IOException {
       
   165         synchronized (receiveLock) {
       
   166             synchronized (sendLock) {
       
   167                 synchronized (stateLock) {
       
   168                     ensureOpenAndUnconnected();
       
   169                     if (isBound())
       
   170                         throw new AlreadyBoundException();
       
   171                     InetSocketAddress isa = (local == null) ?
       
   172                         new InetSocketAddress(0) : Net.checkAddress(local);
       
   173                     Net.bind(fd, isa.getAddress(), isa.getPort());
       
   174                     InetSocketAddress boundIsa = Net.localAddress(fd);
       
   175                     port = boundIsa.getPort();
       
   176                     localAddresses.add(isa);
       
   177                     if (isa.getAddress().isAnyLocalAddress())
       
   178                         wildcard = true;
       
   179                 }
       
   180             }
       
   181         }
       
   182         return this;
       
   183     }
       
   184 
       
   185     @Override
       
   186     public SctpChannel bindAddress(InetAddress address)
       
   187             throws IOException {
       
   188         bindUnbindAddress(address, true);
       
   189         localAddresses.add(new InetSocketAddress(address, port));
       
   190         return this;
       
   191     }
       
   192 
       
   193     @Override
       
   194     public SctpChannel unbindAddress(InetAddress address)
       
   195             throws IOException {
       
   196         bindUnbindAddress(address, false);
       
   197         localAddresses.remove(new InetSocketAddress(address, port));
       
   198         return this;
       
   199     }
       
   200 
       
   201     private SctpChannel bindUnbindAddress(InetAddress address, boolean add)
       
   202             throws IOException {
       
   203         if (address == null)
       
   204             throw new IllegalArgumentException();
       
   205 
       
   206         synchronized (receiveLock) {
       
   207             synchronized (sendLock) {
       
   208                 synchronized (stateLock) {
       
   209                     if (!isOpen())
       
   210                         throw new ClosedChannelException();
       
   211                     if (!isBound())
       
   212                         throw new NotYetBoundException();
       
   213                     if (wildcard)
       
   214                         throw new IllegalStateException(
       
   215                                 "Cannot add or remove addresses from a channel that is bound to the wildcard address");
       
   216                     if (address.isAnyLocalAddress())
       
   217                         throw new IllegalArgumentException(
       
   218                                 "Cannot add or remove the wildcard address");
       
   219                     if (add) {
       
   220                         for (InetSocketAddress addr : localAddresses) {
       
   221                             if (addr.getAddress().equals(address)) {
       
   222                                 throw new AlreadyBoundException();
       
   223                             }
       
   224                         }
       
   225                     } else { /*removing */
       
   226                         /* Verify that there is more than one address
       
   227                          * and that address is already bound */
       
   228                         if (localAddresses.size() <= 1)
       
   229                             throw new IllegalUnbindException("Cannot remove address from a channel with only one address bound");
       
   230                         boolean foundAddress = false;
       
   231                         for (InetSocketAddress addr : localAddresses) {
       
   232                             if (addr.getAddress().equals(address)) {
       
   233                                 foundAddress = true;
       
   234                                 break;
       
   235                             }
       
   236                         }
       
   237                         if (!foundAddress )
       
   238                             throw new IllegalUnbindException("Cannot remove address from a channel that is not bound to that address");
       
   239                     }
       
   240 
       
   241                     SctpNet.bindx(fdVal, new InetAddress[]{address}, port, add);
       
   242 
       
   243                     /* Update our internal Set to reflect the addition/removal */
       
   244                     if (add)
       
   245                         localAddresses.add(new InetSocketAddress(address, port));
       
   246                     else {
       
   247                         for (InetSocketAddress addr : localAddresses) {
       
   248                             if (addr.getAddress().equals(address)) {
       
   249                                 localAddresses.remove(addr);
       
   250                                 break;
       
   251                             }
       
   252                         }
       
   253                     }
       
   254                 }
       
   255             }
       
   256         }
       
   257         return this;
       
   258     }
       
   259 
       
   260     private boolean isBound() {
       
   261         synchronized (stateLock) {
       
   262             return port == -1 ? false : true;
       
   263         }
       
   264     }
       
   265 
       
   266     private boolean isConnected() {
       
   267         synchronized (stateLock) {
       
   268             return (state == ChannelState.CONNECTED);
       
   269         }
       
   270     }
       
   271 
       
   272     private void ensureOpenAndUnconnected() throws IOException {
       
   273         synchronized (stateLock) {
       
   274             if (!isOpen())
       
   275                 throw new ClosedChannelException();
       
   276             if (isConnected())
       
   277                 throw new AlreadyConnectedException();
       
   278             if (state == ChannelState.PENDING)
       
   279                 throw new ConnectionPendingException();
       
   280         }
       
   281     }
       
   282 
       
   283     private boolean ensureReceiveOpen() throws ClosedChannelException {
       
   284         synchronized (stateLock) {
       
   285             if (!isOpen())
       
   286                 throw new ClosedChannelException();
       
   287             if (!isConnected())
       
   288                 throw new NotYetConnectedException();
       
   289             else
       
   290                 return true;
       
   291         }
       
   292     }
       
   293 
       
   294     private void ensureSendOpen() throws ClosedChannelException {
       
   295         synchronized (stateLock) {
       
   296             if (!isOpen())
       
   297                 throw new ClosedChannelException();
       
   298             if (isShutdown)
       
   299                 throw new ClosedChannelException();
       
   300             if (!isConnected())
       
   301                 throw new NotYetConnectedException();
       
   302         }
       
   303     }
       
   304 
       
   305     private void receiverCleanup() throws IOException {
       
   306         synchronized (stateLock) {
       
   307             receiverThread = 0;
       
   308             if (state == ChannelState.KILLPENDING)
       
   309                 kill();
       
   310         }
       
   311     }
       
   312 
       
   313     private void senderCleanup() throws IOException {
       
   314         synchronized (stateLock) {
       
   315             senderThread = 0;
       
   316             if (state == ChannelState.KILLPENDING)
       
   317                 kill();
       
   318         }
       
   319     }
       
   320 
       
   321     @Override
       
   322     public Association association() throws ClosedChannelException {
       
   323         synchronized (stateLock) {
       
   324             if (!isOpen())
       
   325                 throw new ClosedChannelException();
       
   326             if (!isConnected())
       
   327                 return null;
       
   328 
       
   329             return association;
       
   330         }
       
   331     }
       
   332 
       
   333     @Override
       
   334     public boolean connect(SocketAddress endpoint) throws IOException {
       
   335         synchronized (receiveLock) {
       
   336             synchronized (sendLock) {
       
   337                 ensureOpenAndUnconnected();
       
   338                 InetSocketAddress isa = Net.checkAddress(endpoint);
       
   339                 SecurityManager sm = System.getSecurityManager();
       
   340                 if (sm != null)
       
   341                     sm.checkConnect(isa.getAddress().getHostAddress(),
       
   342                                     isa.getPort());
       
   343                 synchronized (blockingLock()) {
       
   344                     int n = 0;
       
   345                     try {
       
   346                         try {
       
   347                             begin();
       
   348                             synchronized (stateLock) {
       
   349                                 if (!isOpen()) {
       
   350                                     return false;
       
   351                                 }
       
   352                                 receiverThread = NativeThread.current();
       
   353                             }
       
   354                             for (;;) {
       
   355                                 InetAddress ia = isa.getAddress();
       
   356                                 if (ia.isAnyLocalAddress())
       
   357                                     ia = InetAddress.getLocalHost();
       
   358                                 n = Net.connect(fd, ia, isa.getPort());
       
   359                                 if (  (n == IOStatus.INTERRUPTED)
       
   360                                       && isOpen())
       
   361                                     continue;
       
   362                                 break;
       
   363                             }
       
   364                         } finally {
       
   365                             receiverCleanup();
       
   366                             end((n > 0) || (n == IOStatus.UNAVAILABLE));
       
   367                             assert IOStatus.check(n);
       
   368                         }
       
   369                     } catch (IOException x) {
       
   370                         /* If an exception was thrown, close the channel after
       
   371                          * invoking end() so as to avoid bogus
       
   372                          * AsynchronousCloseExceptions */
       
   373                         close();
       
   374                         throw x;
       
   375                     }
       
   376 
       
   377                     if (n > 0) {
       
   378                         synchronized (stateLock) {
       
   379                             /* Connection succeeded */
       
   380                             state = ChannelState.CONNECTED;
       
   381                             if (!isBound()) {
       
   382                                 InetSocketAddress boundIsa =
       
   383                                         Net.localAddress(fd);
       
   384                                 port = boundIsa.getPort();
       
   385                             }
       
   386 
       
   387                             /* Receive COMM_UP */
       
   388                             ByteBuffer buf = Util.getTemporaryDirectBuffer(50);
       
   389                             try {
       
   390                                 receive(buf, null, null, true);
       
   391                             } finally {
       
   392                                 Util.releaseTemporaryDirectBuffer(buf);
       
   393                             }
       
   394                             return true;
       
   395                         }
       
   396                     } else  {
       
   397                         synchronized (stateLock) {
       
   398                             /* If nonblocking and no exception then connection
       
   399                              * pending; disallow another invocation */
       
   400                             if (!isBlocking())
       
   401                                 state = ChannelState.PENDING;
       
   402                             else
       
   403                                 assert false;
       
   404                         }
       
   405                     }
       
   406                 }
       
   407                 return false;
       
   408             }
       
   409         }
       
   410     }
       
   411 
       
   412     @Override
       
   413     public boolean connect(SocketAddress endpoint,
       
   414                            int maxOutStreams,
       
   415                            int maxInStreams)
       
   416             throws IOException {
       
   417         return setOption(SCTP_INIT_MAXSTREAMS, InitMaxStreams.
       
   418                 create(maxInStreams, maxOutStreams)).connect(endpoint);
       
   419 
       
   420     }
       
   421 
       
   422     @Override
       
   423     public boolean isConnectionPending() {
       
   424         synchronized (stateLock) {
       
   425             return (state == ChannelState.PENDING);
       
   426         }
       
   427     }
       
   428 
       
   429     @Override
       
   430     public boolean finishConnect() throws IOException {
       
   431         synchronized (receiveLock) {
       
   432             synchronized (sendLock) {
       
   433                 synchronized (stateLock) {
       
   434                     if (!isOpen())
       
   435                         throw new ClosedChannelException();
       
   436                     if (isConnected())
       
   437                         return true;
       
   438                     if (state != ChannelState.PENDING)
       
   439                         throw new NoConnectionPendingException();
       
   440                 }
       
   441                 int n = 0;
       
   442                 try {
       
   443                     try {
       
   444                         begin();
       
   445                         synchronized (blockingLock()) {
       
   446                             synchronized (stateLock) {
       
   447                                 if (!isOpen()) {
       
   448                                     return false;
       
   449                                 }
       
   450                                 receiverThread = NativeThread.current();
       
   451                             }
       
   452                             if (!isBlocking()) {
       
   453                                 for (;;) {
       
   454                                     n = checkConnect(fd, false, readyToConnect);
       
   455                                     if (  (n == IOStatus.INTERRUPTED)
       
   456                                           && isOpen())
       
   457                                         continue;
       
   458                                     break;
       
   459                                 }
       
   460                             } else {
       
   461                                 for (;;) {
       
   462                                     n = checkConnect(fd, true, readyToConnect);
       
   463                                     if (n == 0) {
       
   464                                         // Loop in case of
       
   465                                         // spurious notifications
       
   466                                         continue;
       
   467                                     }
       
   468                                     if (  (n == IOStatus.INTERRUPTED)
       
   469                                           && isOpen())
       
   470                                         continue;
       
   471                                     break;
       
   472                                 }
       
   473                             }
       
   474                         }
       
   475                     } finally {
       
   476                         synchronized (stateLock) {
       
   477                             receiverThread = 0;
       
   478                             if (state == ChannelState.KILLPENDING) {
       
   479                                 kill();
       
   480                                 /* poll()/getsockopt() does not report
       
   481                                  * error (throws exception, with n = 0)
       
   482                                  * on Linux platform after dup2 and
       
   483                                  * signal-wakeup. Force n to 0 so the
       
   484                                  * end() can throw appropriate exception */
       
   485                                 n = 0;
       
   486                             }
       
   487                         }
       
   488                         end((n > 0) || (n == IOStatus.UNAVAILABLE));
       
   489                         assert IOStatus.check(n);
       
   490                     }
       
   491                 } catch (IOException x) {
       
   492                     /* If an exception was thrown, close the channel after
       
   493                      * invoking end() so as to avoid bogus
       
   494                      * AsynchronousCloseExceptions */
       
   495                     close();
       
   496                     throw x;
       
   497                 }
       
   498 
       
   499                 if (n > 0) {
       
   500                     synchronized (stateLock) {
       
   501                         state = ChannelState.CONNECTED;
       
   502                         if (!isBound()) {
       
   503                             InetSocketAddress boundIsa =
       
   504                                     Net.localAddress(fd);
       
   505                             port = boundIsa.getPort();
       
   506                         }
       
   507 
       
   508                         /* Receive COMM_UP */
       
   509                         ByteBuffer buf = Util.getTemporaryDirectBuffer(50);
       
   510                         try {
       
   511                             receive(buf, null, null, true);
       
   512                         } finally {
       
   513                             Util.releaseTemporaryDirectBuffer(buf);
       
   514                         }
       
   515                         return true;
       
   516                     }
       
   517                 }
       
   518             }
       
   519         }
       
   520         return false;
       
   521     }
       
   522 
       
   523     @Override
       
   524     protected void implConfigureBlocking(boolean block) throws IOException {
       
   525         IOUtil.configureBlocking(fd, block);
       
   526     }
       
   527 
       
   528     @Override
       
   529     public void implCloseSelectableChannel() throws IOException {
       
   530         synchronized (stateLock) {
       
   531             nd.preClose(fd);
       
   532 
       
   533             if (receiverThread != 0)
       
   534                 NativeThread.signal(receiverThread);
       
   535 
       
   536             if (senderThread != 0)
       
   537                 NativeThread.signal(senderThread);
       
   538 
       
   539             if (!isRegistered())
       
   540                 kill();
       
   541         }
       
   542     }
       
   543 
       
   544     @Override
       
   545     public FileDescriptor getFD() {
       
   546         return fd;
       
   547     }
       
   548 
       
   549     @Override
       
   550     public int getFDVal() {
       
   551         return fdVal;
       
   552     }
       
   553 
       
   554     /**
       
   555      * Translates native poll revent ops into a ready operation ops
       
   556      */
       
   557     private boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl sk) {
       
   558         int intOps = sk.nioInterestOps();
       
   559         int oldOps = sk.nioReadyOps();
       
   560         int newOps = initialOps;
       
   561 
       
   562         if ((ops & PollArrayWrapper.POLLNVAL) != 0) {
       
   563             /* This should only happen if this channel is pre-closed while a
       
   564              * selection operation is in progress
       
   565              * ## Throw an error if this channel has not been pre-closed */
       
   566             return false;
       
   567         }
       
   568 
       
   569         if ((ops & (PollArrayWrapper.POLLERR
       
   570                     | PollArrayWrapper.POLLHUP)) != 0) {
       
   571             newOps = intOps;
       
   572             sk.nioReadyOps(newOps);
       
   573             /* No need to poll again in checkConnect,
       
   574              * the error will be detected there */
       
   575             readyToConnect = true;
       
   576             return (newOps & ~oldOps) != 0;
       
   577         }
       
   578 
       
   579         if (((ops & PollArrayWrapper.POLLIN) != 0) &&
       
   580             ((intOps & SelectionKey.OP_READ) != 0) &&
       
   581             isConnected())
       
   582             newOps |= SelectionKey.OP_READ;
       
   583 
       
   584         if (((ops & PollArrayWrapper.POLLCONN) != 0) &&
       
   585             ((intOps & SelectionKey.OP_CONNECT) != 0) &&
       
   586             ((state == ChannelState.UNCONNECTED) || (state == ChannelState.PENDING))) {
       
   587             newOps |= SelectionKey.OP_CONNECT;
       
   588             readyToConnect = true;
       
   589         }
       
   590 
       
   591         if (((ops & PollArrayWrapper.POLLOUT) != 0) &&
       
   592             ((intOps & SelectionKey.OP_WRITE) != 0) &&
       
   593             isConnected())
       
   594             newOps |= SelectionKey.OP_WRITE;
       
   595 
       
   596         sk.nioReadyOps(newOps);
       
   597         return (newOps & ~oldOps) != 0;
       
   598     }
       
   599 
       
   600     @Override
       
   601     public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) {
       
   602         return translateReadyOps(ops, sk.nioReadyOps(), sk);
       
   603     }
       
   604 
       
   605     @Override
       
   606     @SuppressWarnings("all")
       
   607     public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) {
       
   608         return translateReadyOps(ops, 0, sk);
       
   609     }
       
   610 
       
   611     @Override
       
   612     public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
       
   613         int newOps = 0;
       
   614         if ((ops & SelectionKey.OP_READ) != 0)
       
   615             newOps |= PollArrayWrapper.POLLIN;
       
   616         if ((ops & SelectionKey.OP_WRITE) != 0)
       
   617             newOps |= PollArrayWrapper.POLLOUT;
       
   618         if ((ops & SelectionKey.OP_CONNECT) != 0)
       
   619             newOps |= PollArrayWrapper.POLLCONN;
       
   620         sk.selector.putEventOps(sk, newOps);
       
   621     }
       
   622 
       
   623     @Override
       
   624     public void kill() throws IOException {
       
   625         synchronized (stateLock) {
       
   626             if (state == ChannelState.KILLED)
       
   627                 return;
       
   628             if (state == ChannelState.UNINITIALIZED) {
       
   629                 state = ChannelState.KILLED;
       
   630                 return;
       
   631             }
       
   632             assert !isOpen() && !isRegistered();
       
   633 
       
   634             /* Postpone the kill if there is a waiting reader
       
   635              * or writer thread. */
       
   636             if (receiverThread == 0 && senderThread == 0) {
       
   637                 nd.close(fd);
       
   638                 state = ChannelState.KILLED;
       
   639             } else {
       
   640                 state = ChannelState.KILLPENDING;
       
   641             }
       
   642         }
       
   643     }
       
   644 
       
   645     @Override
       
   646     public <T> SctpChannel setOption(SctpSocketOption<T> name, T value)
       
   647             throws IOException {
       
   648         if (name == null)
       
   649             throw new NullPointerException();
       
   650         if (!supportedOptions().contains(name))
       
   651             throw new UnsupportedOperationException("'" + name + "' not supported");
       
   652 
       
   653         synchronized (stateLock) {
       
   654             if (!isOpen())
       
   655                 throw new ClosedChannelException();
       
   656 
       
   657             SctpNet.setSocketOption(fdVal, name, value, 0 /*oneToOne*/);
       
   658         }
       
   659         return this;
       
   660     }
       
   661 
       
   662     @Override
       
   663     @SuppressWarnings("unchecked")
       
   664     public <T> T getOption(SctpSocketOption<T> name) throws IOException {
       
   665         if (name == null)
       
   666             throw new NullPointerException();
       
   667         if (!supportedOptions().contains(name))
       
   668             throw new UnsupportedOperationException("'" + name + "' not supported");
       
   669 
       
   670         synchronized (stateLock) {
       
   671             if (!isOpen())
       
   672                 throw new ClosedChannelException();
       
   673 
       
   674             return (T)SctpNet.getSocketOption(fdVal, name, 0 /*oneToOne*/);
       
   675         }
       
   676     }
       
   677 
       
   678     private static class DefaultOptionsHolder {
       
   679         static final Set<SctpSocketOption<?>> defaultOptions = defaultOptions();
       
   680 
       
   681         private static Set<SctpSocketOption<?>> defaultOptions() {
       
   682             HashSet<SctpSocketOption<?>> set = new HashSet<SctpSocketOption<?>>(10);
       
   683             set.add(SCTP_DISABLE_FRAGMENTS);
       
   684             set.add(SCTP_EXPLICIT_COMPLETE);
       
   685             set.add(SCTP_FRAGMENT_INTERLEAVE);
       
   686             set.add(SCTP_INIT_MAXSTREAMS);
       
   687             set.add(SCTP_NODELAY);
       
   688             set.add(SCTP_PRIMARY_ADDR);
       
   689             set.add(SCTP_SET_PEER_PRIMARY_ADDR);
       
   690             set.add(SO_SNDBUF);
       
   691             set.add(SO_RCVBUF);
       
   692             set.add(SO_LINGER);
       
   693             return Collections.unmodifiableSet(set);
       
   694         }
       
   695     }
       
   696 
       
   697     @Override
       
   698     public final Set<SctpSocketOption<?>> supportedOptions() {
       
   699         return DefaultOptionsHolder.defaultOptions;
       
   700     }
       
   701 
       
   702     @Override
       
   703     public <T> MessageInfo receive(ByteBuffer buffer,
       
   704                                    T attachment,
       
   705                                    NotificationHandler<T> handler)
       
   706             throws IOException {
       
   707         return receive(buffer, attachment, handler, false);
       
   708     }
       
   709 
       
   710     private <T> MessageInfo receive(ByteBuffer buffer,
       
   711                                     T attachment,
       
   712                                     NotificationHandler<T> handler,
       
   713                                     boolean fromConnect)
       
   714             throws IOException {
       
   715         if (buffer == null)
       
   716             throw new IllegalArgumentException("buffer cannot be null");
       
   717 
       
   718         if (buffer.isReadOnly())
       
   719             throw new IllegalArgumentException("Read-only buffer");
       
   720 
       
   721         if (receiveInvoked.get())
       
   722             throw new IllegalReceiveException(
       
   723                     "cannot invoke receive from handler");
       
   724         receiveInvoked.set(Boolean.TRUE);
       
   725 
       
   726         try {
       
   727             SctpResultContainer resultContainer = new SctpResultContainer();
       
   728             do {
       
   729                 resultContainer.clear();
       
   730                 synchronized (receiveLock) {
       
   731                     if (!ensureReceiveOpen())
       
   732                         return null;
       
   733 
       
   734                     if (commUpResultContainer != null) {
       
   735                         resultContainer = commUpResultContainer;
       
   736                         commUpResultContainer = null;
       
   737                         continue;
       
   738                     }
       
   739 
       
   740                     int n = 0;
       
   741                     try {
       
   742                         begin();
       
   743 
       
   744                         synchronized (stateLock) {
       
   745                             if(!isOpen())
       
   746                                 return null;
       
   747                             receiverThread = NativeThread.current();
       
   748                         }
       
   749 
       
   750                         do {
       
   751                             n = receive(fdVal, buffer, resultContainer);
       
   752                         } while ((n == IOStatus.INTERRUPTED) && isOpen());
       
   753                     } finally {
       
   754                         receiverCleanup();
       
   755                         end((n > 0) || (n == IOStatus.UNAVAILABLE));
       
   756                         assert IOStatus.check(n);
       
   757                     }
       
   758 
       
   759                     if (!resultContainer.isNotification()) {
       
   760                         /* message or nothing */
       
   761                         if (resultContainer.hasSomething()) {
       
   762                             /* Set the association before returning */
       
   763                             SctpMessageInfoImpl info =
       
   764                                     resultContainer.getMessageInfo();
       
   765                             synchronized (stateLock) {
       
   766                                 assert association != null;
       
   767                                 info.setAssociation(association);
       
   768                             }
       
   769                             return info;
       
   770                         } else
       
   771                             /* Non-blocking may return null if nothing available*/
       
   772                             return null;
       
   773                     } else { /* notification */
       
   774                         synchronized (stateLock) {
       
   775                             handleNotificationInternal(
       
   776                                     resultContainer);
       
   777                         }
       
   778                     }
       
   779 
       
   780                     if (fromConnect)  {
       
   781                         /* If we reach here, then it was connect that invoked
       
   782                          * receive an received the COMM_UP. Save it and allow
       
   783                          * the user handler to process it upon next receive. */
       
   784                         commUpResultContainer = resultContainer;
       
   785                         return null;
       
   786                     }
       
   787                 }  /* receiveLock */
       
   788             } while (handler == null ? true :
       
   789                 (invokeNotificationHandler(resultContainer, handler, attachment)
       
   790                  == HandlerResult.CONTINUE));
       
   791 
       
   792             return null;
       
   793         } finally {
       
   794             receiveInvoked.set(Boolean.FALSE);
       
   795         }
       
   796     }
       
   797 
       
   798     private int receive(int fd,
       
   799                         ByteBuffer dst,
       
   800                         SctpResultContainer resultContainer)
       
   801             throws IOException {
       
   802         int pos = dst.position();
       
   803         int lim = dst.limit();
       
   804         assert (pos <= lim);
       
   805         int rem = (pos <= lim ? lim - pos : 0);
       
   806         if (dst instanceof DirectBuffer && rem > 0)
       
   807             return receiveIntoNativeBuffer(fd, resultContainer, dst, rem, pos);
       
   808 
       
   809         /* Substitute a native buffer */
       
   810         int newSize = Math.max(rem, 1);
       
   811         ByteBuffer bb = Util.getTemporaryDirectBuffer(newSize);
       
   812         try {
       
   813             int n = receiveIntoNativeBuffer(fd, resultContainer, bb, newSize, 0);
       
   814             bb.flip();
       
   815             if (n > 0 && rem > 0)
       
   816                 dst.put(bb);
       
   817             return n;
       
   818         } finally {
       
   819             Util.releaseTemporaryDirectBuffer(bb);
       
   820         }
       
   821     }
       
   822 
       
   823     private int receiveIntoNativeBuffer(int fd,
       
   824                                         SctpResultContainer resultContainer,
       
   825                                         ByteBuffer bb,
       
   826                                         int rem,
       
   827                                         int pos)
       
   828         throws IOException
       
   829     {
       
   830         int n = receive0(fd, resultContainer, ((DirectBuffer)bb).address() + pos, rem);
       
   831 
       
   832         if (n > 0)
       
   833             bb.position(pos + n);
       
   834         return n;
       
   835     }
       
   836 
       
   837     private InternalNotificationHandler<?> internalNotificationHandler =
       
   838             new InternalNotificationHandler();
       
   839 
       
   840     private void handleNotificationInternal(SctpResultContainer resultContainer)
       
   841     {
       
   842         invokeNotificationHandler(resultContainer,
       
   843                 internalNotificationHandler, null);
       
   844     }
       
   845 
       
   846     private class InternalNotificationHandler<T>
       
   847             extends AbstractNotificationHandler<T>
       
   848     {
       
   849         @Override
       
   850         public HandlerResult handleNotification(
       
   851                 AssociationChangeNotification not, T unused) {
       
   852             if (not.event().equals(
       
   853                     AssociationChangeNotification.AssocChangeEvent.COMM_UP)) {
       
   854                 assert association == null;
       
   855                 SctpAssocChange sac = (SctpAssocChange) not;
       
   856                 association = new SctpAssociationImpl
       
   857                        (sac.assocId(), sac.maxInStreams(), sac.maxOutStreams());
       
   858             }
       
   859             return HandlerResult.CONTINUE;
       
   860         }
       
   861     }
       
   862 
       
   863     private <T> HandlerResult invokeNotificationHandler
       
   864                                  (SctpResultContainer resultContainer,
       
   865                                   NotificationHandler<T> handler,
       
   866                                   T attachment) {
       
   867         SctpNotification notification = resultContainer.notification();
       
   868         synchronized (stateLock) {
       
   869             notification.setAssociation(association);
       
   870         }
       
   871 
       
   872         if (!(handler instanceof AbstractNotificationHandler)) {
       
   873             return handler.handleNotification(notification, attachment);
       
   874         }
       
   875 
       
   876         /* AbstractNotificationHandler */
       
   877         AbstractNotificationHandler absHandler =
       
   878                 (AbstractNotificationHandler)handler;
       
   879         switch(resultContainer.type()) {
       
   880             case ASSOCIATION_CHANGED :
       
   881                 return absHandler.handleNotification(
       
   882                         resultContainer.getAssociationChanged(), attachment);
       
   883             case PEER_ADDRESS_CHANGED :
       
   884                 return absHandler.handleNotification(
       
   885                         resultContainer.getPeerAddressChanged(), attachment);
       
   886             case SEND_FAILED :
       
   887                 return absHandler.handleNotification(
       
   888                         resultContainer.getSendFailed(), attachment);
       
   889             case SHUTDOWN :
       
   890                 return absHandler.handleNotification(
       
   891                         resultContainer.getShutdown(), attachment);
       
   892             default :
       
   893                 /* implementation specific handlers */
       
   894                 return absHandler.handleNotification(
       
   895                         resultContainer.notification(), attachment);
       
   896         }
       
   897     }
       
   898 
       
   899     private void checkAssociation(Association sendAssociation) {
       
   900         synchronized (stateLock) {
       
   901             if (sendAssociation != null && !sendAssociation.equals(association)) {
       
   902                 throw new IllegalArgumentException(
       
   903                         "Cannot send to another association");
       
   904             }
       
   905         }
       
   906     }
       
   907 
       
   908     private void checkStreamNumber(int streamNumber) {
       
   909         synchronized (stateLock) {
       
   910             if (association != null) {
       
   911                 if (streamNumber < 0 ||
       
   912                       streamNumber >= association.maxOutboundStreams())
       
   913                     throw new InvalidStreamException();
       
   914             }
       
   915         }
       
   916     }
       
   917 
       
   918     /* TODO: Add support for ttl and isComplete to both 121 12M
       
   919      *       SCTP_EOR not yet supported on reference platforms
       
   920      *       TTL support limited...
       
   921      */
       
   922     @Override
       
   923     public int send(ByteBuffer buffer, MessageInfo messageInfo)
       
   924             throws IOException {
       
   925         if (buffer == null)
       
   926             throw new IllegalArgumentException("buffer cannot be null");
       
   927 
       
   928         if (messageInfo == null)
       
   929             throw new IllegalArgumentException("messageInfo cannot be null");
       
   930 
       
   931         checkAssociation(messageInfo.association());
       
   932         checkStreamNumber(messageInfo.streamNumber());
       
   933 
       
   934         synchronized (sendLock) {
       
   935             ensureSendOpen();
       
   936 
       
   937             int n = 0;
       
   938             try {
       
   939                 begin();
       
   940 
       
   941                 synchronized (stateLock) {
       
   942                     if(!isOpen())
       
   943                         return 0;
       
   944                     senderThread = NativeThread.current();
       
   945                 }
       
   946 
       
   947                 do {
       
   948                     n = send(fdVal, buffer, messageInfo);
       
   949                 } while ((n == IOStatus.INTERRUPTED) && isOpen());
       
   950 
       
   951                 return IOStatus.normalize(n);
       
   952             } finally {
       
   953                 senderCleanup();
       
   954                 end((n > 0) || (n == IOStatus.UNAVAILABLE));
       
   955                 assert IOStatus.check(n);
       
   956             }
       
   957         }
       
   958     }
       
   959 
       
   960     private int send(int fd, ByteBuffer src, MessageInfo messageInfo)
       
   961             throws IOException {
       
   962         int streamNumber = messageInfo.streamNumber();
       
   963         SocketAddress target = messageInfo.address();
       
   964         boolean unordered = messageInfo.isUnordered();
       
   965         int ppid = messageInfo.payloadProtocolID();
       
   966         int pos = src.position();
       
   967         int lim = src.limit();
       
   968 
       
   969         assert (pos <= lim && streamNumber > 0);
       
   970         int rem = (pos <= lim ? lim - pos : 0);
       
   971 
       
   972         if (src instanceof DirectBuffer)
       
   973             return sendFromNativeBuffer(fd, src, rem, pos, target, streamNumber,
       
   974                     unordered, ppid);
       
   975 
       
   976         /* Substitute a native buffer */
       
   977         ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
       
   978         try {
       
   979             bb.put(src);
       
   980             bb.flip();
       
   981             /* Do not update src until we see how many bytes were written */
       
   982             src.position(pos);
       
   983 
       
   984             int n = sendFromNativeBuffer(fd, bb, rem, pos, target, streamNumber,
       
   985                     unordered, ppid);
       
   986             if (n > 0) {
       
   987                 /* now update src */
       
   988                 src.position(pos + n);
       
   989             }
       
   990             return n;
       
   991         } finally {
       
   992             Util.releaseTemporaryDirectBuffer(bb);
       
   993         }
       
   994     }
       
   995 
       
   996     private int sendFromNativeBuffer(int fd,
       
   997                                      ByteBuffer bb,
       
   998                                      int rem,
       
   999                                      int pos,
       
  1000                                      SocketAddress target,
       
  1001                                      int streamNumber,
       
  1002                                      boolean unordered,
       
  1003                                      int ppid)
       
  1004             throws IOException {
       
  1005         int written = send0(fd, ((DirectBuffer)bb).address() + pos,
       
  1006                             rem, target, -1 /*121*/, streamNumber, unordered, ppid);
       
  1007         if (written > 0)
       
  1008             bb.position(pos + written);
       
  1009         return written;
       
  1010     }
       
  1011 
       
  1012     @Override
       
  1013     public SctpChannel shutdown() throws IOException {
       
  1014         synchronized(stateLock) {
       
  1015             if (isShutdown)
       
  1016                 return this;
       
  1017 
       
  1018             ensureSendOpen();
       
  1019             SctpNet.shutdown(fdVal, -1);
       
  1020             if (senderThread != 0)
       
  1021                 NativeThread.signal(senderThread);
       
  1022             isShutdown = true;
       
  1023         }
       
  1024         return this;
       
  1025     }
       
  1026 
       
  1027     @Override
       
  1028     public Set<SocketAddress> getAllLocalAddresses()
       
  1029             throws IOException {
       
  1030         synchronized (stateLock) {
       
  1031             if (!isOpen())
       
  1032                 throw new ClosedChannelException();
       
  1033             if (!isBound())
       
  1034                 return Collections.EMPTY_SET;
       
  1035 
       
  1036             return SctpNet.getLocalAddresses(fdVal);
       
  1037         }
       
  1038     }
       
  1039 
       
  1040     @Override
       
  1041     public Set<SocketAddress> getRemoteAddresses()
       
  1042             throws IOException {
       
  1043         synchronized (stateLock) {
       
  1044             if (!isOpen())
       
  1045                 throw new ClosedChannelException();
       
  1046             if (!isConnected())
       
  1047                 return Collections.EMPTY_SET;
       
  1048 
       
  1049             return SctpNet.getRemoteAddresses(fdVal, 0/*unused*/);
       
  1050         }
       
  1051     }
       
  1052 
       
  1053     /* Native */
       
  1054     private static native void initIDs();
       
  1055 
       
  1056     static native int receive0(int fd, SctpResultContainer resultContainer,
       
  1057             long address, int length) throws IOException;
       
  1058 
       
  1059     static native int send0(int fd, long address, int length,
       
  1060             SocketAddress target, int assocId, int streamNumber,
       
  1061             boolean unordered, int ppid) throws IOException;
       
  1062 
       
  1063     private static native int checkConnect(FileDescriptor fd, boolean block,
       
  1064             boolean ready) throws IOException;
       
  1065 
       
  1066     static {
       
  1067         Util.load();   /* loads nio & net native libraries */
       
  1068         java.security.AccessController.doPrivileged(
       
  1069                 new sun.security.action.LoadLibraryAction("sctp"));
       
  1070         initIDs();
       
  1071         nd = new SctpSocketDispatcher();
       
  1072     }
       
  1073 }