jdk/src/solaris/classes/sun/nio/ch/SctpChannelImpl.java
changeset 12128 09f5d262e329
parent 12127 eab1e4be8495
parent 11872 c51754cddc03
child 12129 561811d8ba18
equal deleted inserted replaced
12127:eab1e4be8495 12128:09f5d262e329
     1 /*
       
     2  * Copyright (c) 2009, 2011, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 package sun.nio.ch;
       
    26 
       
    27 import java.net.InetAddress;
       
    28 import java.net.SocketAddress;
       
    29 import java.net.SocketException;
       
    30 import java.net.InetSocketAddress;
       
    31 import java.io.FileDescriptor;
       
    32 import java.io.IOException;
       
    33 import java.util.Collections;
       
    34 import java.util.Set;
       
    35 import java.util.HashSet;
       
    36 import java.nio.ByteBuffer;
       
    37 import java.nio.channels.SelectionKey;
       
    38 import java.nio.channels.ClosedChannelException;
       
    39 import java.nio.channels.ConnectionPendingException;
       
    40 import java.nio.channels.NoConnectionPendingException;
       
    41 import java.nio.channels.AlreadyConnectedException;
       
    42 import java.nio.channels.NotYetBoundException;
       
    43 import java.nio.channels.NotYetConnectedException;
       
    44 import java.nio.channels.spi.SelectorProvider;
       
    45 import com.sun.nio.sctp.AbstractNotificationHandler;
       
    46 import com.sun.nio.sctp.Association;
       
    47 import com.sun.nio.sctp.AssociationChangeNotification;
       
    48 import com.sun.nio.sctp.HandlerResult;
       
    49 import com.sun.nio.sctp.IllegalReceiveException;
       
    50 import com.sun.nio.sctp.InvalidStreamException;
       
    51 import com.sun.nio.sctp.IllegalUnbindException;
       
    52 import com.sun.nio.sctp.MessageInfo;
       
    53 import com.sun.nio.sctp.NotificationHandler;
       
    54 import com.sun.nio.sctp.SctpChannel;
       
    55 import com.sun.nio.sctp.SctpSocketOption;
       
    56 import static com.sun.nio.sctp.SctpStandardSocketOptions.*;
       
    57 import static sun.nio.ch.SctpResultContainer.SEND_FAILED;
       
    58 import static sun.nio.ch.SctpResultContainer.ASSOCIATION_CHANGED;
       
    59 import static sun.nio.ch.SctpResultContainer.PEER_ADDRESS_CHANGED;
       
    60 import static sun.nio.ch.SctpResultContainer.SHUTDOWN;
       
    61 
       
    62 /**
       
    63  * An implementation of an SctpChannel
       
    64  */
       
    65 public class SctpChannelImpl extends SctpChannel
       
    66     implements SelChImpl
       
    67 {
       
    68     private final FileDescriptor fd;
       
    69 
       
    70     private final int fdVal;
       
    71 
       
    72     /* IDs of native threads doing send and receivess, for signalling */
       
    73     private volatile long receiverThread = 0;
       
    74     private volatile long senderThread = 0;
       
    75 
       
    76     /* Lock held by current receiving or connecting thread */
       
    77     private final Object receiveLock = new Object();
       
    78 
       
    79     /* Lock held by current sending or connecting thread */
       
    80     private final Object sendLock = new Object();
       
    81 
       
    82     private final ThreadLocal<Boolean> receiveInvoked =
       
    83         new ThreadLocal<Boolean>() {
       
    84              @Override protected Boolean initialValue() {
       
    85                  return Boolean.FALSE;
       
    86             }
       
    87     };
       
    88 
       
    89     /* Lock held by any thread that modifies the state fields declared below
       
    90        DO NOT invoke a blocking I/O operation while holding this lock! */
       
    91     private final Object stateLock = new Object();
       
    92 
       
    93     private enum ChannelState {
       
    94         UNINITIALIZED,
       
    95         UNCONNECTED,
       
    96         PENDING,
       
    97         CONNECTED,
       
    98         KILLPENDING,
       
    99         KILLED,
       
   100     }
       
   101     /* -- The following fields are protected by stateLock -- */
       
   102     private ChannelState state = ChannelState.UNINITIALIZED;
       
   103 
       
   104     /* Binding; Once bound the port will remain constant. */
       
   105     int port = -1;
       
   106     private HashSet<InetSocketAddress> localAddresses = new HashSet<InetSocketAddress>();
       
   107     /* Has the channel been bound to the wildcard address */
       
   108     private boolean wildcard; /* false */
       
   109     //private InetSocketAddress remoteAddress = null;
       
   110 
       
   111     /* Input/Output open */
       
   112     private boolean readyToConnect;
       
   113 
       
   114     /* Shutdown */
       
   115     private boolean isShutdown;
       
   116 
       
   117     private Association association;
       
   118 
       
   119     private Set<SocketAddress> remoteAddresses = Collections.emptySet();
       
   120 
       
   121     /* -- End of fields protected by stateLock -- */
       
   122 
       
   123     /**
       
   124      * Constructor for normal connecting sockets
       
   125      */
       
   126     public SctpChannelImpl(SelectorProvider provider) throws IOException {
       
   127         //TODO: update provider remove public modifier
       
   128         super(provider);
       
   129         this.fd = SctpNet.socket(true);
       
   130         this.fdVal = IOUtil.fdVal(fd);
       
   131         this.state = ChannelState.UNCONNECTED;
       
   132     }
       
   133 
       
   134     /**
       
   135      * Constructor for sockets obtained from server sockets
       
   136      */
       
   137     public SctpChannelImpl(SelectorProvider provider, FileDescriptor fd)
       
   138          throws IOException {
       
   139         this(provider, fd, null);
       
   140     }
       
   141 
       
   142     /**
       
   143      * Constructor for sockets obtained from branching
       
   144      */
       
   145     public SctpChannelImpl(SelectorProvider provider,
       
   146                            FileDescriptor fd,
       
   147                            Association association)
       
   148             throws IOException {
       
   149         super(provider);
       
   150         this.fd = fd;
       
   151         this.fdVal = IOUtil.fdVal(fd);
       
   152         this.state = ChannelState.CONNECTED;
       
   153         port = (Net.localAddress(fd)).getPort();
       
   154 
       
   155         if (association != null) { /* branched */
       
   156             this.association = association;
       
   157         } else { /* obtained from server channel */
       
   158             /* Receive COMM_UP */
       
   159             ByteBuffer buf = Util.getTemporaryDirectBuffer(50);
       
   160             try {
       
   161                 receive(buf, null, null, true);
       
   162             } finally {
       
   163                 Util.releaseTemporaryDirectBuffer(buf);
       
   164             }
       
   165         }
       
   166     }
       
   167 
       
   168     /**
       
   169      * Binds the channel's socket to a local address.
       
   170      */
       
   171     @Override
       
   172     public SctpChannel bind(SocketAddress local) throws IOException {
       
   173         synchronized (receiveLock) {
       
   174             synchronized (sendLock) {
       
   175                 synchronized (stateLock) {
       
   176                     ensureOpenAndUnconnected();
       
   177                     if (isBound())
       
   178                         SctpNet.throwAlreadyBoundException();
       
   179                     InetSocketAddress isa = (local == null) ?
       
   180                         new InetSocketAddress(0) : Net.checkAddress(local);
       
   181                     Net.bind(fd, isa.getAddress(), isa.getPort());
       
   182                     InetSocketAddress boundIsa = Net.localAddress(fd);
       
   183                     port = boundIsa.getPort();
       
   184                     localAddresses.add(isa);
       
   185                     if (isa.getAddress().isAnyLocalAddress())
       
   186                         wildcard = true;
       
   187                 }
       
   188             }
       
   189         }
       
   190         return this;
       
   191     }
       
   192 
       
   193     @Override
       
   194     public SctpChannel bindAddress(InetAddress address)
       
   195             throws IOException {
       
   196         bindUnbindAddress(address, true);
       
   197         localAddresses.add(new InetSocketAddress(address, port));
       
   198         return this;
       
   199     }
       
   200 
       
   201     @Override
       
   202     public SctpChannel unbindAddress(InetAddress address)
       
   203             throws IOException {
       
   204         bindUnbindAddress(address, false);
       
   205         localAddresses.remove(new InetSocketAddress(address, port));
       
   206         return this;
       
   207     }
       
   208 
       
   209     private SctpChannel bindUnbindAddress(InetAddress address, boolean add)
       
   210             throws IOException {
       
   211         if (address == null)
       
   212             throw new IllegalArgumentException();
       
   213 
       
   214         synchronized (receiveLock) {
       
   215             synchronized (sendLock) {
       
   216                 synchronized (stateLock) {
       
   217                     if (!isOpen())
       
   218                         throw new ClosedChannelException();
       
   219                     if (!isBound())
       
   220                         throw new NotYetBoundException();
       
   221                     if (wildcard)
       
   222                         throw new IllegalStateException(
       
   223                                 "Cannot add or remove addresses from a channel that is bound to the wildcard address");
       
   224                     if (address.isAnyLocalAddress())
       
   225                         throw new IllegalArgumentException(
       
   226                                 "Cannot add or remove the wildcard address");
       
   227                     if (add) {
       
   228                         for (InetSocketAddress addr : localAddresses) {
       
   229                             if (addr.getAddress().equals(address)) {
       
   230                                 SctpNet.throwAlreadyBoundException();
       
   231                             }
       
   232                         }
       
   233                     } else { /*removing */
       
   234                         /* Verify that there is more than one address
       
   235                          * and that address is already bound */
       
   236                         if (localAddresses.size() <= 1)
       
   237                             throw new IllegalUnbindException("Cannot remove address from a channel with only one address bound");
       
   238                         boolean foundAddress = false;
       
   239                         for (InetSocketAddress addr : localAddresses) {
       
   240                             if (addr.getAddress().equals(address)) {
       
   241                                 foundAddress = true;
       
   242                                 break;
       
   243                             }
       
   244                         }
       
   245                         if (!foundAddress )
       
   246                             throw new IllegalUnbindException("Cannot remove address from a channel that is not bound to that address");
       
   247                     }
       
   248 
       
   249                     SctpNet.bindx(fdVal, new InetAddress[]{address}, port, add);
       
   250 
       
   251                     /* Update our internal Set to reflect the addition/removal */
       
   252                     if (add)
       
   253                         localAddresses.add(new InetSocketAddress(address, port));
       
   254                     else {
       
   255                         for (InetSocketAddress addr : localAddresses) {
       
   256                             if (addr.getAddress().equals(address)) {
       
   257                                 localAddresses.remove(addr);
       
   258                                 break;
       
   259                             }
       
   260                         }
       
   261                     }
       
   262                 }
       
   263             }
       
   264         }
       
   265         return this;
       
   266     }
       
   267 
       
   268     private boolean isBound() {
       
   269         synchronized (stateLock) {
       
   270             return port == -1 ? false : true;
       
   271         }
       
   272     }
       
   273 
       
   274     private boolean isConnected() {
       
   275         synchronized (stateLock) {
       
   276             return (state == ChannelState.CONNECTED);
       
   277         }
       
   278     }
       
   279 
       
   280     private void ensureOpenAndUnconnected() throws IOException {
       
   281         synchronized (stateLock) {
       
   282             if (!isOpen())
       
   283                 throw new ClosedChannelException();
       
   284             if (isConnected())
       
   285                 throw new AlreadyConnectedException();
       
   286             if (state == ChannelState.PENDING)
       
   287                 throw new ConnectionPendingException();
       
   288         }
       
   289     }
       
   290 
       
   291     private boolean ensureReceiveOpen() throws ClosedChannelException {
       
   292         synchronized (stateLock) {
       
   293             if (!isOpen())
       
   294                 throw new ClosedChannelException();
       
   295             if (!isConnected())
       
   296                 throw new NotYetConnectedException();
       
   297             else
       
   298                 return true;
       
   299         }
       
   300     }
       
   301 
       
   302     private void ensureSendOpen() throws ClosedChannelException {
       
   303         synchronized (stateLock) {
       
   304             if (!isOpen())
       
   305                 throw new ClosedChannelException();
       
   306             if (isShutdown)
       
   307                 throw new ClosedChannelException();
       
   308             if (!isConnected())
       
   309                 throw new NotYetConnectedException();
       
   310         }
       
   311     }
       
   312 
       
   313     private void receiverCleanup() throws IOException {
       
   314         synchronized (stateLock) {
       
   315             receiverThread = 0;
       
   316             if (state == ChannelState.KILLPENDING)
       
   317                 kill();
       
   318         }
       
   319     }
       
   320 
       
   321     private void senderCleanup() throws IOException {
       
   322         synchronized (stateLock) {
       
   323             senderThread = 0;
       
   324             if (state == ChannelState.KILLPENDING)
       
   325                 kill();
       
   326         }
       
   327     }
       
   328 
       
   329     @Override
       
   330     public Association association() throws ClosedChannelException {
       
   331         synchronized (stateLock) {
       
   332             if (!isOpen())
       
   333                 throw new ClosedChannelException();
       
   334             if (!isConnected())
       
   335                 return null;
       
   336 
       
   337             return association;
       
   338         }
       
   339     }
       
   340 
       
   341     @Override
       
   342     public boolean connect(SocketAddress endpoint) throws IOException {
       
   343         synchronized (receiveLock) {
       
   344             synchronized (sendLock) {
       
   345                 ensureOpenAndUnconnected();
       
   346                 InetSocketAddress isa = Net.checkAddress(endpoint);
       
   347                 SecurityManager sm = System.getSecurityManager();
       
   348                 if (sm != null)
       
   349                     sm.checkConnect(isa.getAddress().getHostAddress(),
       
   350                                     isa.getPort());
       
   351                 synchronized (blockingLock()) {
       
   352                     int n = 0;
       
   353                     try {
       
   354                         try {
       
   355                             begin();
       
   356                             synchronized (stateLock) {
       
   357                                 if (!isOpen()) {
       
   358                                     return false;
       
   359                                 }
       
   360                                 receiverThread = NativeThread.current();
       
   361                             }
       
   362                             for (;;) {
       
   363                                 InetAddress ia = isa.getAddress();
       
   364                                 if (ia.isAnyLocalAddress())
       
   365                                     ia = InetAddress.getLocalHost();
       
   366                                 n = SctpNet.connect(fdVal, ia, isa.getPort());
       
   367                                 if (  (n == IOStatus.INTERRUPTED)
       
   368                                       && isOpen())
       
   369                                     continue;
       
   370                                 break;
       
   371                             }
       
   372                         } finally {
       
   373                             receiverCleanup();
       
   374                             end((n > 0) || (n == IOStatus.UNAVAILABLE));
       
   375                             assert IOStatus.check(n);
       
   376                         }
       
   377                     } catch (IOException x) {
       
   378                         /* If an exception was thrown, close the channel after
       
   379                          * invoking end() so as to avoid bogus
       
   380                          * AsynchronousCloseExceptions */
       
   381                         close();
       
   382                         throw x;
       
   383                     }
       
   384 
       
   385                     if (n > 0) {
       
   386                         synchronized (stateLock) {
       
   387                             /* Connection succeeded */
       
   388                             state = ChannelState.CONNECTED;
       
   389                             if (!isBound()) {
       
   390                                 InetSocketAddress boundIsa =
       
   391                                         Net.localAddress(fd);
       
   392                                 port = boundIsa.getPort();
       
   393                             }
       
   394 
       
   395                             /* Receive COMM_UP */
       
   396                             ByteBuffer buf = Util.getTemporaryDirectBuffer(50);
       
   397                             try {
       
   398                                 receive(buf, null, null, true);
       
   399                             } finally {
       
   400                                 Util.releaseTemporaryDirectBuffer(buf);
       
   401                             }
       
   402 
       
   403                             /* cache remote addresses */
       
   404                             try {
       
   405                                 remoteAddresses = getRemoteAddresses();
       
   406                             } catch (IOException unused) { /* swallow exception */ }
       
   407 
       
   408                             return true;
       
   409                         }
       
   410                     } else  {
       
   411                         synchronized (stateLock) {
       
   412                             /* If nonblocking and no exception then connection
       
   413                              * pending; disallow another invocation */
       
   414                             if (!isBlocking())
       
   415                                 state = ChannelState.PENDING;
       
   416                             else
       
   417                                 assert false;
       
   418                         }
       
   419                     }
       
   420                 }
       
   421                 return false;
       
   422             }
       
   423         }
       
   424     }
       
   425 
       
   426     @Override
       
   427     public boolean connect(SocketAddress endpoint,
       
   428                            int maxOutStreams,
       
   429                            int maxInStreams)
       
   430             throws IOException {
       
   431         ensureOpenAndUnconnected();
       
   432         return setOption(SCTP_INIT_MAXSTREAMS, InitMaxStreams.
       
   433                 create(maxInStreams, maxOutStreams)).connect(endpoint);
       
   434 
       
   435     }
       
   436 
       
   437     @Override
       
   438     public boolean isConnectionPending() {
       
   439         synchronized (stateLock) {
       
   440             return (state == ChannelState.PENDING);
       
   441         }
       
   442     }
       
   443 
       
   444     @Override
       
   445     public boolean finishConnect() throws IOException {
       
   446         synchronized (receiveLock) {
       
   447             synchronized (sendLock) {
       
   448                 synchronized (stateLock) {
       
   449                     if (!isOpen())
       
   450                         throw new ClosedChannelException();
       
   451                     if (isConnected())
       
   452                         return true;
       
   453                     if (state != ChannelState.PENDING)
       
   454                         throw new NoConnectionPendingException();
       
   455                 }
       
   456                 int n = 0;
       
   457                 try {
       
   458                     try {
       
   459                         begin();
       
   460                         synchronized (blockingLock()) {
       
   461                             synchronized (stateLock) {
       
   462                                 if (!isOpen()) {
       
   463                                     return false;
       
   464                                 }
       
   465                                 receiverThread = NativeThread.current();
       
   466                             }
       
   467                             if (!isBlocking()) {
       
   468                                 for (;;) {
       
   469                                     n = checkConnect(fd, false, readyToConnect);
       
   470                                     if (  (n == IOStatus.INTERRUPTED)
       
   471                                           && isOpen())
       
   472                                         continue;
       
   473                                     break;
       
   474                                 }
       
   475                             } else {
       
   476                                 for (;;) {
       
   477                                     n = checkConnect(fd, true, readyToConnect);
       
   478                                     if (n == 0) {
       
   479                                         // Loop in case of
       
   480                                         // spurious notifications
       
   481                                         continue;
       
   482                                     }
       
   483                                     if (  (n == IOStatus.INTERRUPTED)
       
   484                                           && isOpen())
       
   485                                         continue;
       
   486                                     break;
       
   487                                 }
       
   488                             }
       
   489                         }
       
   490                     } finally {
       
   491                         synchronized (stateLock) {
       
   492                             receiverThread = 0;
       
   493                             if (state == ChannelState.KILLPENDING) {
       
   494                                 kill();
       
   495                                 /* poll()/getsockopt() does not report
       
   496                                  * error (throws exception, with n = 0)
       
   497                                  * on Linux platform after dup2 and
       
   498                                  * signal-wakeup. Force n to 0 so the
       
   499                                  * end() can throw appropriate exception */
       
   500                                 n = 0;
       
   501                             }
       
   502                         }
       
   503                         end((n > 0) || (n == IOStatus.UNAVAILABLE));
       
   504                         assert IOStatus.check(n);
       
   505                     }
       
   506                 } catch (IOException x) {
       
   507                     /* If an exception was thrown, close the channel after
       
   508                      * invoking end() so as to avoid bogus
       
   509                      * AsynchronousCloseExceptions */
       
   510                     close();
       
   511                     throw x;
       
   512                 }
       
   513 
       
   514                 if (n > 0) {
       
   515                     synchronized (stateLock) {
       
   516                         state = ChannelState.CONNECTED;
       
   517                         if (!isBound()) {
       
   518                             InetSocketAddress boundIsa =
       
   519                                     Net.localAddress(fd);
       
   520                             port = boundIsa.getPort();
       
   521                         }
       
   522 
       
   523                         /* Receive COMM_UP */
       
   524                         ByteBuffer buf = Util.getTemporaryDirectBuffer(50);
       
   525                         try {
       
   526                             receive(buf, null, null, true);
       
   527                         } finally {
       
   528                             Util.releaseTemporaryDirectBuffer(buf);
       
   529                         }
       
   530 
       
   531                         /* cache remote addresses */
       
   532                         try {
       
   533                             remoteAddresses = getRemoteAddresses();
       
   534                         } catch (IOException unused) { /* swallow exception */ }
       
   535 
       
   536                         return true;
       
   537                     }
       
   538                 }
       
   539             }
       
   540         }
       
   541         return false;
       
   542     }
       
   543 
       
   544     @Override
       
   545     protected void implConfigureBlocking(boolean block) throws IOException {
       
   546         IOUtil.configureBlocking(fd, block);
       
   547     }
       
   548 
       
   549     @Override
       
   550     public void implCloseSelectableChannel() throws IOException {
       
   551         synchronized (stateLock) {
       
   552             SctpNet.preClose(fdVal);
       
   553 
       
   554             if (receiverThread != 0)
       
   555                 NativeThread.signal(receiverThread);
       
   556 
       
   557             if (senderThread != 0)
       
   558                 NativeThread.signal(senderThread);
       
   559 
       
   560             if (!isRegistered())
       
   561                 kill();
       
   562         }
       
   563     }
       
   564 
       
   565     @Override
       
   566     public FileDescriptor getFD() {
       
   567         return fd;
       
   568     }
       
   569 
       
   570     @Override
       
   571     public int getFDVal() {
       
   572         return fdVal;
       
   573     }
       
   574 
       
   575     /**
       
   576      * Translates native poll revent ops into a ready operation ops
       
   577      */
       
   578     private boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl sk) {
       
   579         int intOps = sk.nioInterestOps();
       
   580         int oldOps = sk.nioReadyOps();
       
   581         int newOps = initialOps;
       
   582 
       
   583         if ((ops & PollArrayWrapper.POLLNVAL) != 0) {
       
   584             /* This should only happen if this channel is pre-closed while a
       
   585              * selection operation is in progress
       
   586              * ## Throw an error if this channel has not been pre-closed */
       
   587             return false;
       
   588         }
       
   589 
       
   590         if ((ops & (PollArrayWrapper.POLLERR
       
   591                     | PollArrayWrapper.POLLHUP)) != 0) {
       
   592             newOps = intOps;
       
   593             sk.nioReadyOps(newOps);
       
   594             /* No need to poll again in checkConnect,
       
   595              * the error will be detected there */
       
   596             readyToConnect = true;
       
   597             return (newOps & ~oldOps) != 0;
       
   598         }
       
   599 
       
   600         if (((ops & PollArrayWrapper.POLLIN) != 0) &&
       
   601             ((intOps & SelectionKey.OP_READ) != 0) &&
       
   602             isConnected())
       
   603             newOps |= SelectionKey.OP_READ;
       
   604 
       
   605         if (((ops & PollArrayWrapper.POLLCONN) != 0) &&
       
   606             ((intOps & SelectionKey.OP_CONNECT) != 0) &&
       
   607             ((state == ChannelState.UNCONNECTED) || (state == ChannelState.PENDING))) {
       
   608             newOps |= SelectionKey.OP_CONNECT;
       
   609             readyToConnect = true;
       
   610         }
       
   611 
       
   612         if (((ops & PollArrayWrapper.POLLOUT) != 0) &&
       
   613             ((intOps & SelectionKey.OP_WRITE) != 0) &&
       
   614             isConnected())
       
   615             newOps |= SelectionKey.OP_WRITE;
       
   616 
       
   617         sk.nioReadyOps(newOps);
       
   618         return (newOps & ~oldOps) != 0;
       
   619     }
       
   620 
       
   621     @Override
       
   622     public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) {
       
   623         return translateReadyOps(ops, sk.nioReadyOps(), sk);
       
   624     }
       
   625 
       
   626     @Override
       
   627     @SuppressWarnings("all")
       
   628     public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) {
       
   629         return translateReadyOps(ops, 0, sk);
       
   630     }
       
   631 
       
   632     @Override
       
   633     public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
       
   634         int newOps = 0;
       
   635         if ((ops & SelectionKey.OP_READ) != 0)
       
   636             newOps |= PollArrayWrapper.POLLIN;
       
   637         if ((ops & SelectionKey.OP_WRITE) != 0)
       
   638             newOps |= PollArrayWrapper.POLLOUT;
       
   639         if ((ops & SelectionKey.OP_CONNECT) != 0)
       
   640             newOps |= PollArrayWrapper.POLLCONN;
       
   641         sk.selector.putEventOps(sk, newOps);
       
   642     }
       
   643 
       
   644     @Override
       
   645     public void kill() throws IOException {
       
   646         synchronized (stateLock) {
       
   647             if (state == ChannelState.KILLED)
       
   648                 return;
       
   649             if (state == ChannelState.UNINITIALIZED) {
       
   650                 state = ChannelState.KILLED;
       
   651                 return;
       
   652             }
       
   653             assert !isOpen() && !isRegistered();
       
   654 
       
   655             /* Postpone the kill if there is a waiting reader
       
   656              * or writer thread. */
       
   657             if (receiverThread == 0 && senderThread == 0) {
       
   658                 SctpNet.close(fdVal);
       
   659                 state = ChannelState.KILLED;
       
   660             } else {
       
   661                 state = ChannelState.KILLPENDING;
       
   662             }
       
   663         }
       
   664     }
       
   665 
       
   666     @Override
       
   667     public <T> SctpChannel setOption(SctpSocketOption<T> name, T value)
       
   668             throws IOException {
       
   669         if (name == null)
       
   670             throw new NullPointerException();
       
   671         if (!supportedOptions().contains(name))
       
   672             throw new UnsupportedOperationException("'" + name + "' not supported");
       
   673 
       
   674         synchronized (stateLock) {
       
   675             if (!isOpen())
       
   676                 throw new ClosedChannelException();
       
   677 
       
   678             SctpNet.setSocketOption(fdVal, name, value, 0 /*oneToOne*/);
       
   679         }
       
   680         return this;
       
   681     }
       
   682 
       
   683     @Override
       
   684     @SuppressWarnings("unchecked")
       
   685     public <T> T getOption(SctpSocketOption<T> name) throws IOException {
       
   686         if (name == null)
       
   687             throw new NullPointerException();
       
   688         if (!supportedOptions().contains(name))
       
   689             throw new UnsupportedOperationException("'" + name + "' not supported");
       
   690 
       
   691         synchronized (stateLock) {
       
   692             if (!isOpen())
       
   693                 throw new ClosedChannelException();
       
   694 
       
   695             return (T)SctpNet.getSocketOption(fdVal, name, 0 /*oneToOne*/);
       
   696         }
       
   697     }
       
   698 
       
   699     private static class DefaultOptionsHolder {
       
   700         static final Set<SctpSocketOption<?>> defaultOptions = defaultOptions();
       
   701 
       
   702         private static Set<SctpSocketOption<?>> defaultOptions() {
       
   703             HashSet<SctpSocketOption<?>> set = new HashSet<SctpSocketOption<?>>(10);
       
   704             set.add(SCTP_DISABLE_FRAGMENTS);
       
   705             set.add(SCTP_EXPLICIT_COMPLETE);
       
   706             set.add(SCTP_FRAGMENT_INTERLEAVE);
       
   707             set.add(SCTP_INIT_MAXSTREAMS);
       
   708             set.add(SCTP_NODELAY);
       
   709             set.add(SCTP_PRIMARY_ADDR);
       
   710             set.add(SCTP_SET_PEER_PRIMARY_ADDR);
       
   711             set.add(SO_SNDBUF);
       
   712             set.add(SO_RCVBUF);
       
   713             set.add(SO_LINGER);
       
   714             return Collections.unmodifiableSet(set);
       
   715         }
       
   716     }
       
   717 
       
   718     @Override
       
   719     public final Set<SctpSocketOption<?>> supportedOptions() {
       
   720         return DefaultOptionsHolder.defaultOptions;
       
   721     }
       
   722 
       
   723     @Override
       
   724     public <T> MessageInfo receive(ByteBuffer buffer,
       
   725                                    T attachment,
       
   726                                    NotificationHandler<T> handler)
       
   727             throws IOException {
       
   728         return receive(buffer, attachment, handler, false);
       
   729     }
       
   730 
       
   731     private <T> MessageInfo receive(ByteBuffer buffer,
       
   732                                     T attachment,
       
   733                                     NotificationHandler<T> handler,
       
   734                                     boolean fromConnect)
       
   735             throws IOException {
       
   736         if (buffer == null)
       
   737             throw new IllegalArgumentException("buffer cannot be null");
       
   738 
       
   739         if (buffer.isReadOnly())
       
   740             throw new IllegalArgumentException("Read-only buffer");
       
   741 
       
   742         if (receiveInvoked.get())
       
   743             throw new IllegalReceiveException(
       
   744                     "cannot invoke receive from handler");
       
   745         receiveInvoked.set(Boolean.TRUE);
       
   746 
       
   747         try {
       
   748             SctpResultContainer resultContainer = new SctpResultContainer();
       
   749             do {
       
   750                 resultContainer.clear();
       
   751                 synchronized (receiveLock) {
       
   752                     if (!ensureReceiveOpen())
       
   753                         return null;
       
   754 
       
   755                     int n = 0;
       
   756                     try {
       
   757                         begin();
       
   758 
       
   759                         synchronized (stateLock) {
       
   760                             if(!isOpen())
       
   761                                 return null;
       
   762                             receiverThread = NativeThread.current();
       
   763                         }
       
   764 
       
   765                         do {
       
   766                             n = receive(fdVal, buffer, resultContainer, fromConnect);
       
   767                         } while ((n == IOStatus.INTERRUPTED) && isOpen());
       
   768                     } finally {
       
   769                         receiverCleanup();
       
   770                         end((n > 0) || (n == IOStatus.UNAVAILABLE));
       
   771                         assert IOStatus.check(n);
       
   772                     }
       
   773 
       
   774                     if (!resultContainer.isNotification()) {
       
   775                         /* message or nothing */
       
   776                         if (resultContainer.hasSomething()) {
       
   777                             /* Set the association before returning */
       
   778                             SctpMessageInfoImpl info =
       
   779                                     resultContainer.getMessageInfo();
       
   780                             synchronized (stateLock) {
       
   781                                 assert association != null;
       
   782                                 info.setAssociation(association);
       
   783                             }
       
   784                             return info;
       
   785                         } else
       
   786                             /* Non-blocking may return null if nothing available*/
       
   787                             return null;
       
   788                     } else { /* notification */
       
   789                         synchronized (stateLock) {
       
   790                             handleNotificationInternal(
       
   791                                     resultContainer);
       
   792                         }
       
   793                     }
       
   794 
       
   795                     if (fromConnect)  {
       
   796                         /* If we reach here, then it was connect that invoked
       
   797                          * receive and received the COMM_UP. We have already
       
   798                          * handled the COMM_UP with the internal notification
       
   799                          * handler. Simply return. */
       
   800                         return null;
       
   801                     }
       
   802                 }  /* receiveLock */
       
   803             } while (handler == null ? true :
       
   804                 (invokeNotificationHandler(resultContainer, handler, attachment)
       
   805                  == HandlerResult.CONTINUE));
       
   806 
       
   807             return null;
       
   808         } finally {
       
   809             receiveInvoked.set(Boolean.FALSE);
       
   810         }
       
   811     }
       
   812 
       
   813     private int receive(int fd,
       
   814                         ByteBuffer dst,
       
   815                         SctpResultContainer resultContainer,
       
   816                         boolean peek)
       
   817             throws IOException {
       
   818         int pos = dst.position();
       
   819         int lim = dst.limit();
       
   820         assert (pos <= lim);
       
   821         int rem = (pos <= lim ? lim - pos : 0);
       
   822         if (dst instanceof DirectBuffer && rem > 0)
       
   823             return receiveIntoNativeBuffer(fd, resultContainer, dst, rem, pos, peek);
       
   824 
       
   825         /* Substitute a native buffer */
       
   826         int newSize = Math.max(rem, 1);
       
   827         ByteBuffer bb = Util.getTemporaryDirectBuffer(newSize);
       
   828         try {
       
   829             int n = receiveIntoNativeBuffer(fd, resultContainer, bb, newSize, 0, peek);
       
   830             bb.flip();
       
   831             if (n > 0 && rem > 0)
       
   832                 dst.put(bb);
       
   833             return n;
       
   834         } finally {
       
   835             Util.releaseTemporaryDirectBuffer(bb);
       
   836         }
       
   837     }
       
   838 
       
   839     private int receiveIntoNativeBuffer(int fd,
       
   840                                         SctpResultContainer resultContainer,
       
   841                                         ByteBuffer bb,
       
   842                                         int rem,
       
   843                                         int pos,
       
   844                                         boolean peek)
       
   845         throws IOException
       
   846     {
       
   847         int n = receive0(fd, resultContainer, ((DirectBuffer)bb).address() + pos, rem, peek);
       
   848 
       
   849         if (n > 0)
       
   850             bb.position(pos + n);
       
   851         return n;
       
   852     }
       
   853 
       
   854     private InternalNotificationHandler internalNotificationHandler =
       
   855             new InternalNotificationHandler();
       
   856 
       
   857     private void handleNotificationInternal(SctpResultContainer resultContainer)
       
   858     {
       
   859         invokeNotificationHandler(resultContainer,
       
   860                 internalNotificationHandler, null);
       
   861     }
       
   862 
       
   863     private class InternalNotificationHandler
       
   864             extends AbstractNotificationHandler<Object>
       
   865     {
       
   866         @Override
       
   867         public HandlerResult handleNotification(
       
   868                 AssociationChangeNotification not, Object unused) {
       
   869             if (not.event().equals(
       
   870                     AssociationChangeNotification.AssocChangeEvent.COMM_UP) &&
       
   871                     association == null) {
       
   872                 SctpAssocChange sac = (SctpAssocChange) not;
       
   873                 association = new SctpAssociationImpl
       
   874                        (sac.assocId(), sac.maxInStreams(), sac.maxOutStreams());
       
   875             }
       
   876             return HandlerResult.CONTINUE;
       
   877         }
       
   878     }
       
   879 
       
   880     private <T> HandlerResult invokeNotificationHandler
       
   881                                  (SctpResultContainer resultContainer,
       
   882                                   NotificationHandler<T> handler,
       
   883                                   T attachment) {
       
   884         SctpNotification notification = resultContainer.notification();
       
   885         synchronized (stateLock) {
       
   886             notification.setAssociation(association);
       
   887         }
       
   888 
       
   889         if (!(handler instanceof AbstractNotificationHandler)) {
       
   890             return handler.handleNotification(notification, attachment);
       
   891         }
       
   892 
       
   893         /* AbstractNotificationHandler */
       
   894         AbstractNotificationHandler<T> absHandler =
       
   895                 (AbstractNotificationHandler<T>)handler;
       
   896         switch(resultContainer.type()) {
       
   897             case ASSOCIATION_CHANGED :
       
   898                 return absHandler.handleNotification(
       
   899                         resultContainer.getAssociationChanged(), attachment);
       
   900             case PEER_ADDRESS_CHANGED :
       
   901                 return absHandler.handleNotification(
       
   902                         resultContainer.getPeerAddressChanged(), attachment);
       
   903             case SEND_FAILED :
       
   904                 return absHandler.handleNotification(
       
   905                         resultContainer.getSendFailed(), attachment);
       
   906             case SHUTDOWN :
       
   907                 return absHandler.handleNotification(
       
   908                         resultContainer.getShutdown(), attachment);
       
   909             default :
       
   910                 /* implementation specific handlers */
       
   911                 return absHandler.handleNotification(
       
   912                         resultContainer.notification(), attachment);
       
   913         }
       
   914     }
       
   915 
       
   916     private void checkAssociation(Association sendAssociation) {
       
   917         synchronized (stateLock) {
       
   918             if (sendAssociation != null && !sendAssociation.equals(association)) {
       
   919                 throw new IllegalArgumentException(
       
   920                         "Cannot send to another association");
       
   921             }
       
   922         }
       
   923     }
       
   924 
       
   925     private void checkStreamNumber(int streamNumber) {
       
   926         synchronized (stateLock) {
       
   927             if (association != null) {
       
   928                 if (streamNumber < 0 ||
       
   929                       streamNumber >= association.maxOutboundStreams())
       
   930                     throw new InvalidStreamException();
       
   931             }
       
   932         }
       
   933     }
       
   934 
       
   935     /* TODO: Add support for ttl and isComplete to both 121 12M
       
   936      *       SCTP_EOR not yet supported on reference platforms
       
   937      *       TTL support limited...
       
   938      */
       
   939     @Override
       
   940     public int send(ByteBuffer buffer, MessageInfo messageInfo)
       
   941             throws IOException {
       
   942         if (buffer == null)
       
   943             throw new IllegalArgumentException("buffer cannot be null");
       
   944 
       
   945         if (messageInfo == null)
       
   946             throw new IllegalArgumentException("messageInfo cannot be null");
       
   947 
       
   948         checkAssociation(messageInfo.association());
       
   949         checkStreamNumber(messageInfo.streamNumber());
       
   950 
       
   951         synchronized (sendLock) {
       
   952             ensureSendOpen();
       
   953 
       
   954             int n = 0;
       
   955             try {
       
   956                 begin();
       
   957 
       
   958                 synchronized (stateLock) {
       
   959                     if(!isOpen())
       
   960                         return 0;
       
   961                     senderThread = NativeThread.current();
       
   962                 }
       
   963 
       
   964                 do {
       
   965                     n = send(fdVal, buffer, messageInfo);
       
   966                 } while ((n == IOStatus.INTERRUPTED) && isOpen());
       
   967 
       
   968                 return IOStatus.normalize(n);
       
   969             } finally {
       
   970                 senderCleanup();
       
   971                 end((n > 0) || (n == IOStatus.UNAVAILABLE));
       
   972                 assert IOStatus.check(n);
       
   973             }
       
   974         }
       
   975     }
       
   976 
       
   977     private int send(int fd, ByteBuffer src, MessageInfo messageInfo)
       
   978             throws IOException {
       
   979         int streamNumber = messageInfo.streamNumber();
       
   980         SocketAddress target = messageInfo.address();
       
   981         boolean unordered = messageInfo.isUnordered();
       
   982         int ppid = messageInfo.payloadProtocolID();
       
   983 
       
   984         if (src instanceof DirectBuffer)
       
   985             return sendFromNativeBuffer(fd, src, target, streamNumber,
       
   986                     unordered, ppid);
       
   987 
       
   988         /* Substitute a native buffer */
       
   989         int pos = src.position();
       
   990         int lim = src.limit();
       
   991         assert (pos <= lim && streamNumber >= 0);
       
   992 
       
   993         int rem = (pos <= lim ? lim - pos : 0);
       
   994         ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
       
   995         try {
       
   996             bb.put(src);
       
   997             bb.flip();
       
   998             /* Do not update src until we see how many bytes were written */
       
   999             src.position(pos);
       
  1000 
       
  1001             int n = sendFromNativeBuffer(fd, bb, target, streamNumber,
       
  1002                     unordered, ppid);
       
  1003             if (n > 0) {
       
  1004                 /* now update src */
       
  1005                 src.position(pos + n);
       
  1006             }
       
  1007             return n;
       
  1008         } finally {
       
  1009             Util.releaseTemporaryDirectBuffer(bb);
       
  1010         }
       
  1011     }
       
  1012 
       
  1013     private int sendFromNativeBuffer(int fd,
       
  1014                                      ByteBuffer bb,
       
  1015                                      SocketAddress target,
       
  1016                                      int streamNumber,
       
  1017                                      boolean unordered,
       
  1018                                      int ppid)
       
  1019             throws IOException {
       
  1020         int pos = bb.position();
       
  1021         int lim = bb.limit();
       
  1022         assert (pos <= lim);
       
  1023         int rem = (pos <= lim ? lim - pos : 0);
       
  1024 
       
  1025         int written = send0(fd, ((DirectBuffer)bb).address() + pos,
       
  1026                             rem, target, -1 /*121*/, streamNumber, unordered, ppid);
       
  1027         if (written > 0)
       
  1028             bb.position(pos + written);
       
  1029         return written;
       
  1030     }
       
  1031 
       
  1032     @Override
       
  1033     public SctpChannel shutdown() throws IOException {
       
  1034         synchronized(stateLock) {
       
  1035             if (isShutdown)
       
  1036                 return this;
       
  1037 
       
  1038             ensureSendOpen();
       
  1039             SctpNet.shutdown(fdVal, -1);
       
  1040             if (senderThread != 0)
       
  1041                 NativeThread.signal(senderThread);
       
  1042             isShutdown = true;
       
  1043         }
       
  1044         return this;
       
  1045     }
       
  1046 
       
  1047     @Override
       
  1048     public Set<SocketAddress> getAllLocalAddresses()
       
  1049             throws IOException {
       
  1050         synchronized (stateLock) {
       
  1051             if (!isOpen())
       
  1052                 throw new ClosedChannelException();
       
  1053             if (!isBound())
       
  1054                 return Collections.emptySet();
       
  1055 
       
  1056             return SctpNet.getLocalAddresses(fdVal);
       
  1057         }
       
  1058     }
       
  1059 
       
  1060     @Override
       
  1061     public Set<SocketAddress> getRemoteAddresses()
       
  1062             throws IOException {
       
  1063         synchronized (stateLock) {
       
  1064             if (!isOpen())
       
  1065                 throw new ClosedChannelException();
       
  1066             if (!isConnected() || isShutdown)
       
  1067                 return Collections.emptySet();
       
  1068 
       
  1069             try {
       
  1070                 return SctpNet.getRemoteAddresses(fdVal, 0/*unused*/);
       
  1071             } catch (SocketException unused) {
       
  1072                 /* an open connected channel should always have remote addresses */
       
  1073                 return remoteAddresses;
       
  1074             }
       
  1075         }
       
  1076     }
       
  1077 
       
  1078     /* Native */
       
  1079     private static native void initIDs();
       
  1080 
       
  1081     static native int receive0(int fd, SctpResultContainer resultContainer,
       
  1082             long address, int length, boolean peek) throws IOException;
       
  1083 
       
  1084     static native int send0(int fd, long address, int length,
       
  1085             SocketAddress target, int assocId, int streamNumber,
       
  1086             boolean unordered, int ppid) throws IOException;
       
  1087 
       
  1088     private static native int checkConnect(FileDescriptor fd, boolean block,
       
  1089             boolean ready) throws IOException;
       
  1090 
       
  1091     static {
       
  1092         Util.load();   /* loads nio & net native libraries */
       
  1093         java.security.AccessController.doPrivileged(
       
  1094                 new sun.security.action.LoadLibraryAction("sctp"));
       
  1095         initIDs();
       
  1096     }
       
  1097 }