jdk/src/solaris/classes/sun/nio/ch/SctpMultiChannelImpl.java
changeset 12128 09f5d262e329
parent 12127 eab1e4be8495
parent 11872 c51754cddc03
child 12129 561811d8ba18
equal deleted inserted replaced
12127:eab1e4be8495 12128:09f5d262e329
     1 /*
       
     2  * Copyright (c) 2009, 2011, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 package sun.nio.ch;
       
    26 
       
    27 import java.net.InetAddress;
       
    28 import java.net.SocketAddress;
       
    29 import java.net.SocketException;
       
    30 import java.net.InetSocketAddress;
       
    31 import java.io.FileDescriptor;
       
    32 import java.io.IOException;
       
    33 import java.util.Collections;
       
    34 import java.util.Map.Entry;
       
    35 import java.util.Iterator;
       
    36 import java.util.Set;
       
    37 import java.util.HashSet;
       
    38 import java.util.HashMap;
       
    39 import java.nio.ByteBuffer;
       
    40 import java.nio.channels.SelectionKey;
       
    41 import java.nio.channels.ClosedChannelException;
       
    42 import java.nio.channels.NotYetBoundException;
       
    43 import java.nio.channels.spi.SelectorProvider;
       
    44 import com.sun.nio.sctp.AbstractNotificationHandler;
       
    45 import com.sun.nio.sctp.Association;
       
    46 import com.sun.nio.sctp.AssociationChangeNotification;
       
    47 import com.sun.nio.sctp.HandlerResult;
       
    48 import com.sun.nio.sctp.IllegalReceiveException;
       
    49 import com.sun.nio.sctp.InvalidStreamException;
       
    50 import com.sun.nio.sctp.IllegalUnbindException;
       
    51 import com.sun.nio.sctp.NotificationHandler;
       
    52 import com.sun.nio.sctp.MessageInfo;
       
    53 import com.sun.nio.sctp.SctpChannel;
       
    54 import com.sun.nio.sctp.SctpMultiChannel;
       
    55 import com.sun.nio.sctp.SctpSocketOption;
       
    56 import static com.sun.nio.sctp.SctpStandardSocketOptions.*;
       
    57 import static sun.nio.ch.SctpResultContainer.*;
       
    58 
       
    59 /**
       
    60  * An implementation of SctpMultiChannel
       
    61  */
       
    62 public class SctpMultiChannelImpl extends SctpMultiChannel
       
    63     implements SelChImpl
       
    64 {
       
    65     private final FileDescriptor fd;
       
    66 
       
    67     private final int fdVal;
       
    68 
       
    69     /* IDs of native threads doing send and receives, for signalling */
       
    70     private volatile long receiverThread = 0;
       
    71     private volatile long senderThread = 0;
       
    72 
       
    73     /* Lock held by current receiving thread */
       
    74     private final Object receiveLock = new Object();
       
    75 
       
    76     /* Lock held by current sending thread */
       
    77     private final Object sendLock = new Object();
       
    78 
       
    79     /* Lock held by any thread that modifies the state fields declared below
       
    80      * DO NOT invoke a blocking I/O operation while holding this lock! */
       
    81     private final Object stateLock = new Object();
       
    82 
       
    83     private enum ChannelState {
       
    84         UNINITIALIZED,
       
    85         KILLPENDING,
       
    86         KILLED,
       
    87     }
       
    88 
       
    89     /* -- The following fields are protected by stateLock -- */
       
    90     private ChannelState state = ChannelState.UNINITIALIZED;
       
    91 
       
    92     /* Binding: Once bound the port will remain constant. */
       
    93     int port = -1;
       
    94     private HashSet<InetSocketAddress> localAddresses = new HashSet<InetSocketAddress>();
       
    95     /* Has the channel been bound to the wildcard address */
       
    96     private boolean wildcard; /* false */
       
    97 
       
    98     /* Keeps a map of addresses to association, and visa versa */
       
    99     private HashMap<SocketAddress, Association> addressMap =
       
   100                          new HashMap<SocketAddress, Association>();
       
   101     private HashMap<Association, Set<SocketAddress>> associationMap =
       
   102                          new HashMap<Association, Set<SocketAddress>>();
       
   103 
       
   104     /* -- End of fields protected by stateLock -- */
       
   105 
       
   106     /* If an association has been shutdown mark it for removal after
       
   107      * the user handler has been invoked */
       
   108     private final ThreadLocal<Association> associationToRemove =
       
   109         new ThreadLocal<Association>() {
       
   110              @Override protected Association initialValue() {
       
   111                  return null;
       
   112             }
       
   113     };
       
   114 
       
   115     /* A notification handler cannot invoke receive */
       
   116     private final ThreadLocal<Boolean> receiveInvoked =
       
   117         new ThreadLocal<Boolean>() {
       
   118              @Override protected Boolean initialValue() {
       
   119                  return Boolean.FALSE;
       
   120             }
       
   121     };
       
   122 
       
   123     public SctpMultiChannelImpl(SelectorProvider provider)
       
   124             throws IOException {
       
   125         //TODO: update provider, remove public modifier
       
   126         super(provider);
       
   127         this.fd = SctpNet.socket(false /*one-to-many*/);
       
   128         this.fdVal = IOUtil.fdVal(fd);
       
   129     }
       
   130 
       
   131     @Override
       
   132     public SctpMultiChannel bind(SocketAddress local, int backlog)
       
   133             throws IOException {
       
   134         synchronized (receiveLock) {
       
   135             synchronized (sendLock) {
       
   136                 synchronized (stateLock) {
       
   137                     ensureOpen();
       
   138                     if (isBound())
       
   139                         SctpNet.throwAlreadyBoundException();
       
   140                     InetSocketAddress isa = (local == null) ?
       
   141                         new InetSocketAddress(0) : Net.checkAddress(local);
       
   142 
       
   143                     SecurityManager sm = System.getSecurityManager();
       
   144                     if (sm != null)
       
   145                         sm.checkListen(isa.getPort());
       
   146                     Net.bind(fd, isa.getAddress(), isa.getPort());
       
   147 
       
   148                     InetSocketAddress boundIsa = Net.localAddress(fd);
       
   149                     port = boundIsa.getPort();
       
   150                     localAddresses.add(isa);
       
   151                     if (isa.getAddress().isAnyLocalAddress())
       
   152                         wildcard = true;
       
   153 
       
   154                     SctpNet.listen(fdVal, backlog < 1 ? 50 : backlog);
       
   155                 }
       
   156             }
       
   157         }
       
   158         return this;
       
   159     }
       
   160 
       
   161     @Override
       
   162     public SctpMultiChannel bindAddress(InetAddress address)
       
   163             throws IOException {
       
   164         return bindUnbindAddress(address, true);
       
   165     }
       
   166 
       
   167     @Override
       
   168     public SctpMultiChannel unbindAddress(InetAddress address)
       
   169             throws IOException {
       
   170         return bindUnbindAddress(address, false);
       
   171     }
       
   172 
       
   173     private SctpMultiChannel bindUnbindAddress(InetAddress address,
       
   174                                                boolean add)
       
   175             throws IOException {
       
   176         if (address == null)
       
   177             throw new IllegalArgumentException();
       
   178 
       
   179         synchronized (receiveLock) {
       
   180             synchronized (sendLock) {
       
   181                 synchronized (stateLock) {
       
   182                     if (!isOpen())
       
   183                         throw new ClosedChannelException();
       
   184                     if (!isBound())
       
   185                         throw new NotYetBoundException();
       
   186                     if (wildcard)
       
   187                         throw new IllegalStateException(
       
   188                                 "Cannot add or remove addresses from a channel that is bound to the wildcard address");
       
   189                     if (address.isAnyLocalAddress())
       
   190                         throw new IllegalArgumentException(
       
   191                                 "Cannot add or remove the wildcard address");
       
   192                     if (add) {
       
   193                         for (InetSocketAddress addr : localAddresses) {
       
   194                             if (addr.getAddress().equals(address)) {
       
   195                                 SctpNet.throwAlreadyBoundException();
       
   196                             }
       
   197                         }
       
   198                     } else { /*removing */
       
   199                         /* Verify that there is more than one address
       
   200                          * and that address is already bound */
       
   201                         if (localAddresses.size() <= 1)
       
   202                             throw new IllegalUnbindException("Cannot remove address from a channel with only one address bound");
       
   203                         boolean foundAddress = false;
       
   204                         for (InetSocketAddress addr : localAddresses) {
       
   205                             if (addr.getAddress().equals(address)) {
       
   206                                 foundAddress = true;
       
   207                                 break;
       
   208                             }
       
   209                         }
       
   210                         if (!foundAddress )
       
   211                             throw new IllegalUnbindException("Cannot remove address from a channel that is not bound to that address");
       
   212                     }
       
   213 
       
   214                     SctpNet.bindx(fdVal, new InetAddress[]{address}, port, add);
       
   215 
       
   216                     /* Update our internal Set to reflect the addition/removal */
       
   217                     if (add)
       
   218                         localAddresses.add(new InetSocketAddress(address, port));
       
   219                     else {
       
   220                         for (InetSocketAddress addr : localAddresses) {
       
   221                             if (addr.getAddress().equals(address)) {
       
   222                                 localAddresses.remove(addr);
       
   223                                 break;
       
   224                             }
       
   225                         }
       
   226                     }
       
   227                 }
       
   228             }
       
   229         }
       
   230         return this;
       
   231     }
       
   232 
       
   233     @Override
       
   234     public Set<Association> associations()
       
   235             throws ClosedChannelException, NotYetBoundException {
       
   236         synchronized (stateLock) {
       
   237             if (!isOpen())
       
   238                 throw new ClosedChannelException();
       
   239             if (!isBound())
       
   240                 throw new NotYetBoundException();
       
   241 
       
   242             return Collections.unmodifiableSet(associationMap.keySet());
       
   243         }
       
   244     }
       
   245 
       
   246     private boolean isBound() {
       
   247         synchronized (stateLock) {
       
   248             return port == -1 ? false : true;
       
   249         }
       
   250     }
       
   251 
       
   252     private void ensureOpen() throws IOException {
       
   253         synchronized (stateLock) {
       
   254             if (!isOpen())
       
   255                 throw new ClosedChannelException();
       
   256         }
       
   257     }
       
   258 
       
   259     private void receiverCleanup() throws IOException {
       
   260         synchronized (stateLock) {
       
   261             receiverThread = 0;
       
   262             if (state == ChannelState.KILLPENDING)
       
   263                 kill();
       
   264         }
       
   265     }
       
   266 
       
   267     private void senderCleanup() throws IOException {
       
   268         synchronized (stateLock) {
       
   269             senderThread = 0;
       
   270             if (state == ChannelState.KILLPENDING)
       
   271                 kill();
       
   272         }
       
   273     }
       
   274 
       
   275     @Override
       
   276     protected void implConfigureBlocking(boolean block) throws IOException {
       
   277         IOUtil.configureBlocking(fd, block);
       
   278     }
       
   279 
       
   280     @Override
       
   281     public void implCloseSelectableChannel() throws IOException {
       
   282         synchronized (stateLock) {
       
   283             SctpNet.preClose(fdVal);
       
   284 
       
   285             if (receiverThread != 0)
       
   286                 NativeThread.signal(receiverThread);
       
   287 
       
   288             if (senderThread != 0)
       
   289                 NativeThread.signal(senderThread);
       
   290 
       
   291             if (!isRegistered())
       
   292                 kill();
       
   293         }
       
   294     }
       
   295 
       
   296     @Override
       
   297     public FileDescriptor getFD() {
       
   298         return fd;
       
   299     }
       
   300 
       
   301     @Override
       
   302     public int getFDVal() {
       
   303         return fdVal;
       
   304     }
       
   305 
       
   306     /**
       
   307      * Translates native poll revent ops into a ready operation ops
       
   308      */
       
   309     private boolean translateReadyOps(int ops, int initialOps,
       
   310                                       SelectionKeyImpl sk) {
       
   311         int intOps = sk.nioInterestOps();
       
   312         int oldOps = sk.nioReadyOps();
       
   313         int newOps = initialOps;
       
   314 
       
   315         if ((ops & PollArrayWrapper.POLLNVAL) != 0) {
       
   316             /* This should only happen if this channel is pre-closed while a
       
   317              * selection operation is in progress
       
   318              * ## Throw an error if this channel has not been pre-closed */
       
   319             return false;
       
   320         }
       
   321 
       
   322         if ((ops & (PollArrayWrapper.POLLERR
       
   323                     | PollArrayWrapper.POLLHUP)) != 0) {
       
   324             newOps = intOps;
       
   325             sk.nioReadyOps(newOps);
       
   326             return (newOps & ~oldOps) != 0;
       
   327         }
       
   328 
       
   329         if (((ops & PollArrayWrapper.POLLIN) != 0) &&
       
   330             ((intOps & SelectionKey.OP_READ) != 0))
       
   331             newOps |= SelectionKey.OP_READ;
       
   332 
       
   333         if (((ops & PollArrayWrapper.POLLOUT) != 0) &&
       
   334             ((intOps & SelectionKey.OP_WRITE) != 0))
       
   335             newOps |= SelectionKey.OP_WRITE;
       
   336 
       
   337         sk.nioReadyOps(newOps);
       
   338         return (newOps & ~oldOps) != 0;
       
   339     }
       
   340 
       
   341     @Override
       
   342     public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) {
       
   343         return translateReadyOps(ops, sk.nioReadyOps(), sk);
       
   344     }
       
   345 
       
   346     @Override
       
   347     public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) {
       
   348         return translateReadyOps(ops, 0, sk);
       
   349     }
       
   350 
       
   351     @Override
       
   352     public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
       
   353         int newOps = 0;
       
   354         if ((ops & SelectionKey.OP_READ) != 0)
       
   355             newOps |= PollArrayWrapper.POLLIN;
       
   356         if ((ops & SelectionKey.OP_WRITE) != 0)
       
   357             newOps |= PollArrayWrapper.POLLOUT;
       
   358         sk.selector.putEventOps(sk, newOps);
       
   359     }
       
   360 
       
   361     @Override
       
   362     public void kill() throws IOException {
       
   363         synchronized (stateLock) {
       
   364             if (state == ChannelState.KILLED)
       
   365                 return;
       
   366             if (state == ChannelState.UNINITIALIZED) {
       
   367                 state = ChannelState.KILLED;
       
   368                 return;
       
   369             }
       
   370             assert !isOpen() && !isRegistered();
       
   371 
       
   372             /* Postpone the kill if there is a thread sending or receiving. */
       
   373             if (receiverThread == 0 && senderThread == 0) {
       
   374                 SctpNet.close(fdVal);
       
   375                 state = ChannelState.KILLED;
       
   376             } else {
       
   377                 state = ChannelState.KILLPENDING;
       
   378             }
       
   379         }
       
   380     }
       
   381 
       
   382     @Override
       
   383     public <T> SctpMultiChannel setOption(SctpSocketOption<T> name,
       
   384                                           T value,
       
   385                                           Association association)
       
   386             throws IOException {
       
   387         if (name == null)
       
   388             throw new NullPointerException();
       
   389         if (!(supportedOptions().contains(name)))
       
   390             throw new UnsupportedOperationException("'" + name + "' not supported");
       
   391 
       
   392         synchronized (stateLock) {
       
   393             if (association != null && (name.equals(SCTP_PRIMARY_ADDR) ||
       
   394                     name.equals(SCTP_SET_PEER_PRIMARY_ADDR))) {
       
   395                 checkAssociation(association);
       
   396             }
       
   397             if (!isOpen())
       
   398                 throw new ClosedChannelException();
       
   399 
       
   400             int assocId = association == null ? 0 : association.associationID();
       
   401             SctpNet.setSocketOption(fdVal, name, value, assocId);
       
   402         }
       
   403         return this;
       
   404     }
       
   405 
       
   406     @Override
       
   407     @SuppressWarnings("unchecked")
       
   408     public <T> T getOption(SctpSocketOption<T> name, Association association)
       
   409             throws IOException {
       
   410         if (name == null)
       
   411             throw new NullPointerException();
       
   412         if (!supportedOptions().contains(name))
       
   413             throw new UnsupportedOperationException("'" + name + "' not supported");
       
   414 
       
   415         synchronized (stateLock) {
       
   416             if (association != null && (name.equals(SCTP_PRIMARY_ADDR) ||
       
   417                     name.equals(SCTP_SET_PEER_PRIMARY_ADDR))) {
       
   418                 checkAssociation(association);
       
   419             }
       
   420             if (!isOpen())
       
   421                 throw new ClosedChannelException();
       
   422 
       
   423             int assocId = association == null ? 0 : association.associationID();
       
   424             return (T)SctpNet.getSocketOption(fdVal, name, assocId);
       
   425         }
       
   426     }
       
   427 
       
   428     private static class DefaultOptionsHolder {
       
   429         static final Set<SctpSocketOption<?>> defaultOptions = defaultOptions();
       
   430 
       
   431         private static Set<SctpSocketOption<?>> defaultOptions() {
       
   432             HashSet<SctpSocketOption<?>> set = new HashSet<SctpSocketOption<?>>(10);
       
   433             set.add(SCTP_DISABLE_FRAGMENTS);
       
   434             set.add(SCTP_EXPLICIT_COMPLETE);
       
   435             set.add(SCTP_FRAGMENT_INTERLEAVE);
       
   436             set.add(SCTP_INIT_MAXSTREAMS);
       
   437             set.add(SCTP_NODELAY);
       
   438             set.add(SCTP_PRIMARY_ADDR);
       
   439             set.add(SCTP_SET_PEER_PRIMARY_ADDR);
       
   440             set.add(SO_SNDBUF);
       
   441             set.add(SO_RCVBUF);
       
   442             set.add(SO_LINGER);
       
   443             return Collections.unmodifiableSet(set);
       
   444         }
       
   445     }
       
   446 
       
   447     @Override
       
   448     public final Set<SctpSocketOption<?>> supportedOptions() {
       
   449         return DefaultOptionsHolder.defaultOptions;
       
   450     }
       
   451 
       
   452     @Override
       
   453     public <T> MessageInfo receive(ByteBuffer buffer,
       
   454                                    T attachment,
       
   455                                    NotificationHandler<T> handler)
       
   456             throws IOException {
       
   457         if (buffer == null)
       
   458             throw new IllegalArgumentException("buffer cannot be null");
       
   459 
       
   460         if (buffer.isReadOnly())
       
   461             throw new IllegalArgumentException("Read-only buffer");
       
   462 
       
   463         if (receiveInvoked.get())
       
   464             throw new IllegalReceiveException(
       
   465                     "cannot invoke receive from handler");
       
   466         receiveInvoked.set(Boolean.TRUE);
       
   467 
       
   468         try {
       
   469             SctpResultContainer resultContainer = new SctpResultContainer();
       
   470             do {
       
   471                 resultContainer.clear();
       
   472                 synchronized (receiveLock) {
       
   473                     ensureOpen();
       
   474                     if (!isBound())
       
   475                         throw new NotYetBoundException();
       
   476 
       
   477                     int n = 0;
       
   478                     try {
       
   479                         begin();
       
   480 
       
   481                         synchronized (stateLock) {
       
   482                             if(!isOpen())
       
   483                                 return null;
       
   484                             receiverThread = NativeThread.current();
       
   485                         }
       
   486 
       
   487                         do {
       
   488                             n = receive(fdVal, buffer, resultContainer);
       
   489                         } while ((n == IOStatus.INTERRUPTED) && isOpen());
       
   490 
       
   491                     } finally {
       
   492                         receiverCleanup();
       
   493                         end((n > 0) || (n == IOStatus.UNAVAILABLE));
       
   494                         assert IOStatus.check(n);
       
   495                     }
       
   496 
       
   497                     if (!resultContainer.isNotification()) {
       
   498                         /* message or nothing */
       
   499                         if (resultContainer.hasSomething()) {
       
   500                             /* Set the association before returning */
       
   501                             SctpMessageInfoImpl info =
       
   502                                     resultContainer.getMessageInfo();
       
   503                             info.setAssociation(lookupAssociation(info.
       
   504                                     associationID()));
       
   505                             SecurityManager sm = System.getSecurityManager();
       
   506                             if (sm != null) {
       
   507                                 InetSocketAddress isa  = (InetSocketAddress)info.address();
       
   508                                 if (!addressMap.containsKey(isa)) {
       
   509                                     /* must be a new association */
       
   510                                     try {
       
   511                                         sm.checkAccept(isa.getAddress().getHostAddress(),
       
   512                                                        isa.getPort());
       
   513                                     } catch (SecurityException se) {
       
   514                                         buffer.clear();
       
   515                                         throw se;
       
   516                                     }
       
   517                                 }
       
   518                             }
       
   519 
       
   520                             assert info.association() != null;
       
   521                             return info;
       
   522                         } else  {
       
   523                           /* Non-blocking may return null if nothing available*/
       
   524                             return null;
       
   525                         }
       
   526                     } else { /* notification */
       
   527                         synchronized (stateLock) {
       
   528                             handleNotificationInternal(
       
   529                                     resultContainer);
       
   530                         }
       
   531                     }
       
   532                 } /* receiveLock */
       
   533             } while (handler == null ? true :
       
   534                 (invokeNotificationHandler(resultContainer, handler, attachment)
       
   535                  == HandlerResult.CONTINUE));
       
   536         } finally {
       
   537             receiveInvoked.set(Boolean.FALSE);
       
   538         }
       
   539 
       
   540         return null;
       
   541     }
       
   542 
       
   543     private int receive(int fd,
       
   544                         ByteBuffer dst,
       
   545                         SctpResultContainer resultContainer)
       
   546             throws IOException {
       
   547         int pos = dst.position();
       
   548         int lim = dst.limit();
       
   549         assert (pos <= lim);
       
   550         int rem = (pos <= lim ? lim - pos : 0);
       
   551         if (dst instanceof DirectBuffer && rem > 0)
       
   552             return receiveIntoNativeBuffer(fd, resultContainer, dst, rem, pos);
       
   553 
       
   554         /* Substitute a native buffer. */
       
   555         int newSize = Math.max(rem, 1);
       
   556         ByteBuffer bb = Util.getTemporaryDirectBuffer(newSize);
       
   557         try {
       
   558             int n = receiveIntoNativeBuffer(fd, resultContainer, bb, newSize, 0);
       
   559             bb.flip();
       
   560             if (n > 0 && rem > 0)
       
   561                 dst.put(bb);
       
   562             return n;
       
   563         } finally {
       
   564             Util.releaseTemporaryDirectBuffer(bb);
       
   565         }
       
   566     }
       
   567 
       
   568     private int receiveIntoNativeBuffer(int fd,
       
   569                                         SctpResultContainer resultContainer,
       
   570                                         ByteBuffer bb,
       
   571                                         int rem,
       
   572                                         int pos)
       
   573             throws IOException {
       
   574         int n = receive0(fd, resultContainer, ((DirectBuffer)bb).address() + pos, rem);
       
   575         if (n > 0)
       
   576             bb.position(pos + n);
       
   577         return n;
       
   578     }
       
   579 
       
   580     private InternalNotificationHandler internalNotificationHandler =
       
   581             new InternalNotificationHandler();
       
   582 
       
   583     private void handleNotificationInternal(SctpResultContainer resultContainer)
       
   584     {
       
   585         invokeNotificationHandler(resultContainer,
       
   586                 internalNotificationHandler, null);
       
   587     }
       
   588 
       
   589     private class InternalNotificationHandler
       
   590             extends AbstractNotificationHandler<Object>
       
   591     {
       
   592         @Override
       
   593         public HandlerResult handleNotification(
       
   594                 AssociationChangeNotification not, Object unused) {
       
   595             SctpAssocChange sac = (SctpAssocChange) not;
       
   596 
       
   597             /* Update map to reflect change in association */
       
   598             switch (not.event()) {
       
   599                 case COMM_UP :
       
   600                     Association newAssociation = new SctpAssociationImpl
       
   601                        (sac.assocId(), sac.maxInStreams(), sac.maxOutStreams());
       
   602                     addAssociation(newAssociation);
       
   603                     break;
       
   604                 case SHUTDOWN :
       
   605                 case COMM_LOST :
       
   606                 //case RESTART: ???
       
   607                     /* mark association for removal after user handler invoked*/
       
   608                     associationToRemove.set(lookupAssociation(sac.assocId()));
       
   609             }
       
   610             return HandlerResult.CONTINUE;
       
   611         }
       
   612     }
       
   613 
       
   614     private <T> HandlerResult invokeNotificationHandler(
       
   615                                    SctpResultContainer resultContainer,
       
   616                                    NotificationHandler<T> handler,
       
   617                                    T attachment) {
       
   618         HandlerResult result;
       
   619         SctpNotification notification = resultContainer.notification();
       
   620         notification.setAssociation(lookupAssociation(notification.assocId()));
       
   621 
       
   622         if (!(handler instanceof AbstractNotificationHandler)) {
       
   623             result = handler.handleNotification(notification, attachment);
       
   624         } else { /* AbstractNotificationHandler */
       
   625             AbstractNotificationHandler<T> absHandler =
       
   626                     (AbstractNotificationHandler<T>)handler;
       
   627             switch(resultContainer.type()) {
       
   628                 case ASSOCIATION_CHANGED :
       
   629                     result = absHandler.handleNotification(
       
   630                             resultContainer.getAssociationChanged(), attachment);
       
   631                     break;
       
   632                 case PEER_ADDRESS_CHANGED :
       
   633                     result = absHandler.handleNotification(
       
   634                             resultContainer.getPeerAddressChanged(), attachment);
       
   635                     break;
       
   636                 case SEND_FAILED :
       
   637                     result = absHandler.handleNotification(
       
   638                             resultContainer.getSendFailed(), attachment);
       
   639                     break;
       
   640                 case SHUTDOWN :
       
   641                     result =  absHandler.handleNotification(
       
   642                             resultContainer.getShutdown(), attachment);
       
   643                     break;
       
   644                 default :
       
   645                     /* implementation specific handlers */
       
   646                     result =  absHandler.handleNotification(
       
   647                             resultContainer.notification(), attachment);
       
   648             }
       
   649         }
       
   650 
       
   651         if (!(handler instanceof InternalNotificationHandler)) {
       
   652             /* Only remove associations after user handler
       
   653              * has finished with them */
       
   654             Association assoc = associationToRemove.get();
       
   655             if (assoc != null) {
       
   656                 removeAssociation(assoc);
       
   657                 associationToRemove.set(null);
       
   658             }
       
   659 
       
   660         }
       
   661 
       
   662         return result;
       
   663     }
       
   664 
       
   665     private Association lookupAssociation(int assocId) {
       
   666         /* Lookup the association in our internal map */
       
   667         synchronized (stateLock) {
       
   668             Set<Association> assocs = associationMap.keySet();
       
   669             for (Association a : assocs) {
       
   670                 if (a.associationID() == assocId) {
       
   671                     return a;
       
   672                 }
       
   673             }
       
   674         }
       
   675         return null;
       
   676     }
       
   677 
       
   678     private void addAssociation(Association association) {
       
   679         synchronized (stateLock) {
       
   680             int assocId = association.associationID();
       
   681             Set<SocketAddress> addresses = null;
       
   682 
       
   683             try {
       
   684                 addresses = SctpNet.getRemoteAddresses(fdVal, assocId);
       
   685             } catch (IOException unused) {
       
   686                 /* OK, determining connected addresses may not be possible
       
   687                  * shutdown, connection lost, etc */
       
   688             }
       
   689 
       
   690             associationMap.put(association, addresses);
       
   691             if (addresses != null) {
       
   692                 for (SocketAddress addr : addresses)
       
   693                     addressMap.put(addr, association);
       
   694             }
       
   695         }
       
   696     }
       
   697 
       
   698     private void removeAssociation(Association association) {
       
   699         synchronized (stateLock) {
       
   700             int assocId = association.associationID();
       
   701             Set<SocketAddress> addresses = null;
       
   702 
       
   703              try {
       
   704                 addresses = SctpNet.getRemoteAddresses(fdVal, assocId);
       
   705             } catch (IOException unused) {
       
   706                 /* OK, determining connected addresses may not be possible
       
   707                  * shutdown, connection lost, etc */
       
   708             }
       
   709 
       
   710             Set<Association> assocs = associationMap.keySet();
       
   711             for (Association a : assocs) {
       
   712                 if (a.associationID() == assocId) {
       
   713                     associationMap.remove(a);
       
   714                     break;
       
   715                 }
       
   716             }
       
   717             if (addresses != null) {
       
   718                 for (SocketAddress addr : addresses)
       
   719                     addressMap.remove(addr);
       
   720             } else {
       
   721                 /* We cannot determine the connected addresses */
       
   722                 Set<java.util.Map.Entry<SocketAddress, Association>> addrAssocs =
       
   723                         addressMap.entrySet();
       
   724                 Iterator<Entry<SocketAddress, Association>> iterator = addrAssocs.iterator();
       
   725                 while (iterator.hasNext()) {
       
   726                     Entry<SocketAddress, Association> entry = iterator.next();
       
   727                     if (entry.getValue().equals(association)) {
       
   728                         iterator.remove();
       
   729                     }
       
   730                 }
       
   731             }
       
   732         }
       
   733     }
       
   734 
       
   735     /**
       
   736      * @throws  IllegalArgumentException
       
   737      *          If the given association is not controlled by this channel
       
   738      *
       
   739      * @return  {@code true} if, and only if, the given association is one
       
   740      *          of the current associations controlled by this channel
       
   741      */
       
   742     private boolean checkAssociation(Association messageAssoc) {
       
   743         synchronized (stateLock) {
       
   744             for (Association association : associationMap.keySet()) {
       
   745                 if (messageAssoc.equals(association)) {
       
   746                     return true;
       
   747                 }
       
   748             }
       
   749         }
       
   750         throw new IllegalArgumentException(
       
   751               "Given Association is not controlled by this channel");
       
   752     }
       
   753 
       
   754     private void checkStreamNumber(Association assoc, int streamNumber) {
       
   755         synchronized (stateLock) {
       
   756             if (streamNumber < 0 || streamNumber >= assoc.maxOutboundStreams())
       
   757                 throw new InvalidStreamException();
       
   758         }
       
   759     }
       
   760 
       
   761     /* TODO: Add support for ttl and isComplete to both 121 12M
       
   762      *       SCTP_EOR not yet supported on reference platforms
       
   763      *       TTL support limited...
       
   764      */
       
   765     @Override
       
   766     public int send(ByteBuffer buffer, MessageInfo messageInfo)
       
   767             throws IOException {
       
   768         if (buffer == null)
       
   769             throw new IllegalArgumentException("buffer cannot be null");
       
   770 
       
   771         if (messageInfo == null)
       
   772             throw new IllegalArgumentException("messageInfo cannot be null");
       
   773 
       
   774         synchronized (sendLock) {
       
   775             ensureOpen();
       
   776 
       
   777             if (!isBound())
       
   778                 bind(null, 0);
       
   779 
       
   780             int n = 0;
       
   781             try {
       
   782                 int assocId = -1;
       
   783                 SocketAddress address = null;
       
   784                 begin();
       
   785 
       
   786                 synchronized (stateLock) {
       
   787                     if(!isOpen())
       
   788                         return 0;
       
   789                     senderThread = NativeThread.current();
       
   790 
       
   791                     /* Determine what address or association to send to */
       
   792                     Association assoc = messageInfo.association();
       
   793                     InetSocketAddress addr = (InetSocketAddress)messageInfo.address();
       
   794                     if (assoc != null) {
       
   795                         checkAssociation(assoc);
       
   796                         checkStreamNumber(assoc, messageInfo.streamNumber());
       
   797                         assocId = assoc.associationID();
       
   798                         /* have we also got a preferred address */
       
   799                         if (addr != null) {
       
   800                             if (!assoc.equals(addressMap.get(addr)))
       
   801                                 throw new IllegalArgumentException("given preferred address is not part of this association");
       
   802                             address = addr;
       
   803                         }
       
   804                     } else if (addr != null) {
       
   805                         address = addr;
       
   806                         Association association = addressMap.get(addr);
       
   807                         if (association != null) {
       
   808                             checkStreamNumber(association, messageInfo.streamNumber());
       
   809                             assocId = association.associationID();
       
   810 
       
   811                         } else { /* must be new association */
       
   812                             SecurityManager sm = System.getSecurityManager();
       
   813                             if (sm != null)
       
   814                                 sm.checkConnect(addr.getAddress().getHostAddress(),
       
   815                                                 addr.getPort());
       
   816                         }
       
   817                     } else {
       
   818                         throw new AssertionError(
       
   819                             "Both association and address cannot be null");
       
   820                     }
       
   821                 }
       
   822 
       
   823                 do {
       
   824                     n = send(fdVal, buffer, assocId, address, messageInfo);
       
   825                 } while ((n == IOStatus.INTERRUPTED) && isOpen());
       
   826 
       
   827                 return IOStatus.normalize(n);
       
   828             } finally {
       
   829                 senderCleanup();
       
   830                 end((n > 0) || (n == IOStatus.UNAVAILABLE));
       
   831                 assert IOStatus.check(n);
       
   832             }
       
   833         }
       
   834     }
       
   835 
       
   836     private int send(int fd,
       
   837                      ByteBuffer src,
       
   838                      int assocId,
       
   839                      SocketAddress target,
       
   840                      MessageInfo messageInfo)
       
   841             throws IOException {
       
   842         int streamNumber = messageInfo.streamNumber();
       
   843         boolean unordered = messageInfo.isUnordered();
       
   844         int ppid = messageInfo.payloadProtocolID();
       
   845 
       
   846         if (src instanceof DirectBuffer)
       
   847             return sendFromNativeBuffer(fd, src, target, assocId,
       
   848                     streamNumber, unordered, ppid);
       
   849 
       
   850         /* Substitute a native buffer */
       
   851         int pos = src.position();
       
   852         int lim = src.limit();
       
   853         assert (pos <= lim && streamNumber >= 0);
       
   854 
       
   855         int rem = (pos <= lim ? lim - pos : 0);
       
   856         ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
       
   857         try {
       
   858             bb.put(src);
       
   859             bb.flip();
       
   860             /* Do not update src until we see how many bytes were written */
       
   861             src.position(pos);
       
   862 
       
   863             int n = sendFromNativeBuffer(fd, bb, target, assocId,
       
   864                     streamNumber, unordered, ppid);
       
   865             if (n > 0) {
       
   866                 /* now update src */
       
   867                 src.position(pos + n);
       
   868             }
       
   869             return n;
       
   870         } finally {
       
   871             Util.releaseTemporaryDirectBuffer(bb);
       
   872         }
       
   873     }
       
   874 
       
   875     private int sendFromNativeBuffer(int fd,
       
   876                                      ByteBuffer bb,
       
   877                                      SocketAddress target,
       
   878                                      int assocId,
       
   879                                      int streamNumber,
       
   880                                      boolean unordered,
       
   881                                      int ppid)
       
   882             throws IOException {
       
   883         int pos = bb.position();
       
   884         int lim = bb.limit();
       
   885         assert (pos <= lim);
       
   886         int rem = (pos <= lim ? lim - pos : 0);
       
   887 
       
   888         int written = send0(fd, ((DirectBuffer)bb).address() + pos,
       
   889                             rem, target, assocId, streamNumber, unordered, ppid);
       
   890         if (written > 0)
       
   891             bb.position(pos + written);
       
   892         return written;
       
   893     }
       
   894 
       
   895     @Override
       
   896     public SctpMultiChannel shutdown(Association association)
       
   897             throws IOException {
       
   898         synchronized (stateLock) {
       
   899             checkAssociation(association);
       
   900             if (!isOpen())
       
   901                 throw new ClosedChannelException();
       
   902 
       
   903             SctpNet.shutdown(fdVal, association.associationID());
       
   904         }
       
   905         return this;
       
   906     }
       
   907 
       
   908     @Override
       
   909     public Set<SocketAddress> getAllLocalAddresses()
       
   910             throws IOException {
       
   911         synchronized (stateLock) {
       
   912             if (!isOpen())
       
   913                 throw new ClosedChannelException();
       
   914             if (!isBound())
       
   915                 return Collections.emptySet();
       
   916 
       
   917             return SctpNet.getLocalAddresses(fdVal);
       
   918         }
       
   919     }
       
   920 
       
   921     @Override
       
   922     public Set<SocketAddress> getRemoteAddresses(Association association)
       
   923             throws IOException {
       
   924         synchronized (stateLock) {
       
   925             checkAssociation(association);
       
   926             if (!isOpen())
       
   927                 throw new ClosedChannelException();
       
   928 
       
   929             try {
       
   930                 return SctpNet.getRemoteAddresses(fdVal, association.associationID());
       
   931             } catch (SocketException se) {
       
   932                 /* a valid association should always have remote addresses */
       
   933                 Set<SocketAddress> addrs = associationMap.get(association);
       
   934                 return addrs != null ? addrs : Collections.<SocketAddress>emptySet();
       
   935             }
       
   936         }
       
   937     }
       
   938 
       
   939     @Override
       
   940     public SctpChannel branch(Association association)
       
   941             throws IOException {
       
   942         synchronized (stateLock) {
       
   943             checkAssociation(association);
       
   944             if (!isOpen())
       
   945                 throw new ClosedChannelException();
       
   946 
       
   947             FileDescriptor bFd = SctpNet.branch(fdVal,
       
   948                                                 association.associationID());
       
   949             /* successfully branched, we can now remove it from assoc list */
       
   950             removeAssociation(association);
       
   951 
       
   952             return new SctpChannelImpl(provider(), bFd, association);
       
   953         }
       
   954     }
       
   955 
       
   956     /* Use common native implementation shared between
       
   957      * one-to-one and one-to-many */
       
   958     private static int receive0(int fd,
       
   959                                 SctpResultContainer resultContainer,
       
   960                                 long address,
       
   961                                 int length)
       
   962             throws IOException{
       
   963         return SctpChannelImpl.receive0(fd, resultContainer, address,
       
   964                 length, false /*peek */);
       
   965     }
       
   966 
       
   967     private static int send0(int fd,
       
   968                              long address,
       
   969                              int length,
       
   970                              SocketAddress target,
       
   971                              int assocId,
       
   972                              int streamNumber,
       
   973                              boolean unordered,
       
   974                              int ppid)
       
   975             throws IOException {
       
   976         return SctpChannelImpl.send0(fd, address, length, target, assocId,
       
   977                 streamNumber, unordered, ppid);
       
   978     }
       
   979 
       
   980     static {
       
   981         Util.load();   /* loads nio & net native libraries */
       
   982         java.security.AccessController.doPrivileged(
       
   983                 new sun.security.action.LoadLibraryAction("sctp"));
       
   984     }
       
   985 }