src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java
changeset 54620 13b67c1420b8
parent 53419 eac105e3ec13
child 54754 193a8f1a4f3b
equal deleted inserted replaced
54619:b43cc3b9ef40 54620:13b67c1420b8
    51 import java.nio.channels.spi.SelectorProvider;
    51 import java.nio.channels.spi.SelectorProvider;
    52 import java.util.Collections;
    52 import java.util.Collections;
    53 import java.util.HashSet;
    53 import java.util.HashSet;
    54 import java.util.Objects;
    54 import java.util.Objects;
    55 import java.util.Set;
    55 import java.util.Set;
       
    56 import java.util.concurrent.locks.Condition;
    56 import java.util.concurrent.locks.ReentrantLock;
    57 import java.util.concurrent.locks.ReentrantLock;
    57 
    58 
    58 import sun.net.ResourceManager;
    59 import sun.net.ResourceManager;
    59 import sun.net.ext.ExtendedSocketOptions;
    60 import sun.net.ext.ExtendedSocketOptions;
    60 
    61 
    87     // Lock held by current writing or connecting thread
    88     // Lock held by current writing or connecting thread
    88     private final ReentrantLock writeLock = new ReentrantLock();
    89     private final ReentrantLock writeLock = new ReentrantLock();
    89 
    90 
    90     // Lock held by any thread that modifies the state fields declared below
    91     // Lock held by any thread that modifies the state fields declared below
    91     // DO NOT invoke a blocking I/O operation while holding this lock!
    92     // DO NOT invoke a blocking I/O operation while holding this lock!
    92     private final Object stateLock = new Object();
    93     private final ReentrantLock stateLock = new ReentrantLock();
       
    94     private final Condition stateCondition = stateLock.newCondition();
    93 
    95 
    94     // -- The following fields are protected by stateLock
    96     // -- The following fields are protected by stateLock
    95 
    97 
    96     // State (does not necessarily increase monotonically)
    98     // State (does not necessarily increase monotonically)
    97     private static final int ST_UNCONNECTED = 0;
    99     private static final int ST_UNCONNECTED = 0;
   177         this.family = Net.isIPv6Available()
   179         this.family = Net.isIPv6Available()
   178                 ? StandardProtocolFamily.INET6
   180                 ? StandardProtocolFamily.INET6
   179                 : StandardProtocolFamily.INET;
   181                 : StandardProtocolFamily.INET;
   180         this.fd = fd;
   182         this.fd = fd;
   181         this.fdVal = IOUtil.fdVal(fd);
   183         this.fdVal = IOUtil.fdVal(fd);
   182         synchronized (stateLock) {
   184         stateLock.lock();
       
   185         try {
   183             this.localAddress = Net.localAddress(fd);
   186             this.localAddress = Net.localAddress(fd);
       
   187         } finally {
       
   188             stateLock.unlock();
   184         }
   189         }
   185     }
   190     }
   186 
   191 
   187     // @throws ClosedChannelException if channel is closed
   192     // @throws ClosedChannelException if channel is closed
   188     private void ensureOpen() throws ClosedChannelException {
   193     private void ensureOpen() throws ClosedChannelException {
   190             throw new ClosedChannelException();
   195             throw new ClosedChannelException();
   191     }
   196     }
   192 
   197 
   193     @Override
   198     @Override
   194     public DatagramSocket socket() {
   199     public DatagramSocket socket() {
   195         synchronized (stateLock) {
   200         stateLock.lock();
       
   201         try {
   196             if (socket == null)
   202             if (socket == null)
   197                 socket = DatagramSocketAdaptor.create(this);
   203                 socket = DatagramSocketAdaptor.create(this);
   198             return socket;
   204             return socket;
       
   205         } finally {
       
   206             stateLock.unlock();
   199         }
   207         }
   200     }
   208     }
   201 
   209 
   202     @Override
   210     @Override
   203     public SocketAddress getLocalAddress() throws IOException {
   211     public SocketAddress getLocalAddress() throws IOException {
   204         synchronized (stateLock) {
   212         stateLock.lock();
       
   213         try {
   205             ensureOpen();
   214             ensureOpen();
   206             // Perform security check before returning address
   215             // Perform security check before returning address
   207             return Net.getRevealedLocalAddress(localAddress);
   216             return Net.getRevealedLocalAddress(localAddress);
       
   217         } finally {
       
   218             stateLock.unlock();
   208         }
   219         }
   209     }
   220     }
   210 
   221 
   211     @Override
   222     @Override
   212     public SocketAddress getRemoteAddress() throws IOException {
   223     public SocketAddress getRemoteAddress() throws IOException {
   213         synchronized (stateLock) {
   224         stateLock.lock();
       
   225         try {
   214             ensureOpen();
   226             ensureOpen();
   215             return remoteAddress;
   227             return remoteAddress;
       
   228         } finally {
       
   229             stateLock.unlock();
   216         }
   230         }
   217     }
   231     }
   218 
   232 
   219     @Override
   233     @Override
   220     public <T> DatagramChannel setOption(SocketOption<T> name, T value)
   234     public <T> DatagramChannel setOption(SocketOption<T> name, T value)
   222     {
   236     {
   223         Objects.requireNonNull(name);
   237         Objects.requireNonNull(name);
   224         if (!supportedOptions().contains(name))
   238         if (!supportedOptions().contains(name))
   225             throw new UnsupportedOperationException("'" + name + "' not supported");
   239             throw new UnsupportedOperationException("'" + name + "' not supported");
   226 
   240 
   227         synchronized (stateLock) {
   241         stateLock.lock();
       
   242         try {
   228             ensureOpen();
   243             ensureOpen();
   229 
   244 
   230             if (name == StandardSocketOptions.IP_TOS ||
   245             if (name == StandardSocketOptions.IP_TOS ||
   231                 name == StandardSocketOptions.IP_MULTICAST_TTL ||
   246                 name == StandardSocketOptions.IP_MULTICAST_TTL ||
   232                 name == StandardSocketOptions.IP_MULTICAST_LOOP)
   247                 name == StandardSocketOptions.IP_MULTICAST_LOOP)
   262             }
   277             }
   263 
   278 
   264             // remaining options don't need any special handling
   279             // remaining options don't need any special handling
   265             Net.setSocketOption(fd, Net.UNSPEC, name, value);
   280             Net.setSocketOption(fd, Net.UNSPEC, name, value);
   266             return this;
   281             return this;
       
   282         } finally {
       
   283             stateLock.unlock();
   267         }
   284         }
   268     }
   285     }
   269 
   286 
   270     @Override
   287     @Override
   271     @SuppressWarnings("unchecked")
   288     @SuppressWarnings("unchecked")
   274     {
   291     {
   275         Objects.requireNonNull(name);
   292         Objects.requireNonNull(name);
   276         if (!supportedOptions().contains(name))
   293         if (!supportedOptions().contains(name))
   277             throw new UnsupportedOperationException("'" + name + "' not supported");
   294             throw new UnsupportedOperationException("'" + name + "' not supported");
   278 
   295 
   279         synchronized (stateLock) {
   296         stateLock.lock();
       
   297         try {
   280             ensureOpen();
   298             ensureOpen();
   281 
   299 
   282             if (name == StandardSocketOptions.IP_TOS ||
   300             if (name == StandardSocketOptions.IP_TOS ||
   283                 name == StandardSocketOptions.IP_MULTICAST_TTL ||
   301                 name == StandardSocketOptions.IP_MULTICAST_TTL ||
   284                 name == StandardSocketOptions.IP_MULTICAST_LOOP)
   302                 name == StandardSocketOptions.IP_MULTICAST_LOOP)
   313                 return (T)Boolean.valueOf(isReuseAddress);
   331                 return (T)Boolean.valueOf(isReuseAddress);
   314             }
   332             }
   315 
   333 
   316             // no special handling
   334             // no special handling
   317             return (T) Net.getSocketOption(fd, Net.UNSPEC, name);
   335             return (T) Net.getSocketOption(fd, Net.UNSPEC, name);
       
   336         } finally {
       
   337             stateLock.unlock();
   318         }
   338         }
   319     }
   339     }
   320 
   340 
   321     private static class DefaultOptionsHolder {
   341     private static class DefaultOptionsHolder {
   322         static final Set<SocketOption<?>> defaultOptions = defaultOptions();
   342         static final Set<SocketOption<?>> defaultOptions = defaultOptions();
   360         if (blocking) {
   380         if (blocking) {
   361             // set hook for Thread.interrupt
   381             // set hook for Thread.interrupt
   362             begin();
   382             begin();
   363         }
   383         }
   364         SocketAddress remote;
   384         SocketAddress remote;
   365         synchronized (stateLock) {
   385         stateLock.lock();
       
   386         try {
   366             ensureOpen();
   387             ensureOpen();
   367             remote = remoteAddress;
   388             remote = remoteAddress;
   368             if ((remote == null) && mustBeConnected)
   389             if ((remote == null) && mustBeConnected)
   369                 throw new NotYetConnectedException();
   390                 throw new NotYetConnectedException();
   370             if (localAddress == null)
   391             if (localAddress == null)
   371                 bindInternal(null);
   392                 bindInternal(null);
   372             if (blocking)
   393             if (blocking)
   373                 readerThread = NativeThread.current();
   394                 readerThread = NativeThread.current();
       
   395         } finally {
       
   396             stateLock.unlock();
   374         }
   397         }
   375         return remote;
   398         return remote;
   376     }
   399     }
   377 
   400 
   378     /**
   401     /**
   382      */
   405      */
   383     private void endRead(boolean blocking, boolean completed)
   406     private void endRead(boolean blocking, boolean completed)
   384         throws AsynchronousCloseException
   407         throws AsynchronousCloseException
   385     {
   408     {
   386         if (blocking) {
   409         if (blocking) {
   387             synchronized (stateLock) {
   410             stateLock.lock();
       
   411             try {
   388                 readerThread = 0;
   412                 readerThread = 0;
   389                 // notify any thread waiting in implCloseSelectableChannel
   413                 // notify any thread waiting in implCloseSelectableChannel
   390                 if (state == ST_CLOSING) {
   414                 if (state == ST_CLOSING) {
   391                     stateLock.notifyAll();
   415                     stateCondition.signalAll();
   392                 }
   416                 }
       
   417             } finally {
       
   418                 stateLock.unlock();
   393             }
   419             }
   394             // remove hook for Thread.interrupt
   420             // remove hook for Thread.interrupt
   395             end(completed);
   421             end(completed);
   396         }
   422         }
   397     }
   423     }
   412                 SocketAddress remote = beginRead(blocking, false);
   438                 SocketAddress remote = beginRead(blocking, false);
   413                 boolean connected = (remote != null);
   439                 boolean connected = (remote != null);
   414                 SecurityManager sm = System.getSecurityManager();
   440                 SecurityManager sm = System.getSecurityManager();
   415                 if (connected || (sm == null)) {
   441                 if (connected || (sm == null)) {
   416                     // connected or no security manager
   442                     // connected or no security manager
   417                     do {
   443                     n = receive(fd, dst, connected);
   418                         n = receive(fd, dst, connected);
   444                     if (blocking) {
   419                     } while ((n == IOStatus.INTERRUPTED) && isOpen());
   445                         while (IOStatus.okayToRetry(n) && isOpen()) {
   420                     if (n == IOStatus.UNAVAILABLE)
   446                             park(Net.POLLIN);
       
   447                             n = receive(fd, dst, connected);
       
   448                         }
       
   449                     } else if (n == IOStatus.UNAVAILABLE) {
   421                         return null;
   450                         return null;
       
   451                     }
   422                 } else {
   452                 } else {
   423                     // Cannot receive into user's buffer when running with a
   453                     // Cannot receive into user's buffer when running with a
   424                     // security manager and not connected
   454                     // security manager and not connected
   425                     bb = Util.getTemporaryDirectBuffer(dst.remaining());
   455                     bb = Util.getTemporaryDirectBuffer(dst.remaining());
   426                     for (;;) {
   456                     for (;;) {
   427                         do {
   457                         n = receive(fd, bb, connected);
   428                             n = receive(fd, bb, connected);
   458                         if (blocking) {
   429                         } while ((n == IOStatus.INTERRUPTED) && isOpen());
   459                             while (IOStatus.okayToRetry(n) && isOpen()) {
   430                         if (n == IOStatus.UNAVAILABLE)
   460                                 park(Net.POLLIN);
       
   461                                 n = receive(fd, bb, connected);
       
   462                             }
       
   463                         } else if (n == IOStatus.UNAVAILABLE) {
   431                             return null;
   464                             return null;
       
   465                         }
   432                         InetSocketAddress isa = (InetSocketAddress)sender;
   466                         InetSocketAddress isa = (InetSocketAddress)sender;
   433                         try {
   467                         try {
   434                             sm.checkAccept(isa.getAddress().getHostAddress(),
   468                             sm.checkAccept(isa.getAddress().getHostAddress(),
   435                                            isa.getPort());
   469                                            isa.getPort());
   436                         } catch (SecurityException se) {
   470                         } catch (SecurityException se) {
   491         if (n > 0)
   525         if (n > 0)
   492             bb.position(pos + n);
   526             bb.position(pos + n);
   493         return n;
   527         return n;
   494     }
   528     }
   495 
   529 
       
   530     @Override
   496     public int send(ByteBuffer src, SocketAddress target)
   531     public int send(ByteBuffer src, SocketAddress target)
   497         throws IOException
   532         throws IOException
   498     {
   533     {
   499         Objects.requireNonNull(src);
   534         Objects.requireNonNull(src);
   500         InetSocketAddress isa = Net.checkAddress(target, family);
   535         InetSocketAddress isa = Net.checkAddress(target, family);
   508                 if (remote != null) {
   543                 if (remote != null) {
   509                     // connected
   544                     // connected
   510                     if (!target.equals(remote)) {
   545                     if (!target.equals(remote)) {
   511                         throw new AlreadyConnectedException();
   546                         throw new AlreadyConnectedException();
   512                     }
   547                     }
   513                     do {
   548                     n = IOUtil.write(fd, src, -1, nd);
   514                         n = IOUtil.write(fd, src, -1, nd);
   549                     if (blocking) {
   515                     } while ((n == IOStatus.INTERRUPTED) && isOpen());
   550                         while (IOStatus.okayToRetry(n) && isOpen()) {
       
   551                             park(Net.POLLOUT);
       
   552                             n = IOUtil.write(fd, src, -1, nd);
       
   553                         }
       
   554                     }
   516                 } else {
   555                 } else {
   517                     // not connected
   556                     // not connected
   518                     SecurityManager sm = System.getSecurityManager();
   557                     SecurityManager sm = System.getSecurityManager();
   519                     if (sm != null) {
   558                     if (sm != null) {
   520                         InetAddress ia = isa.getAddress();
   559                         InetAddress ia = isa.getAddress();
   522                             sm.checkMulticast(ia);
   561                             sm.checkMulticast(ia);
   523                         } else {
   562                         } else {
   524                             sm.checkConnect(ia.getHostAddress(), isa.getPort());
   563                             sm.checkConnect(ia.getHostAddress(), isa.getPort());
   525                         }
   564                         }
   526                     }
   565                     }
   527                     do {
   566                     n = send(fd, src, isa);
   528                         n = send(fd, src, isa);
   567                     if (blocking) {
   529                     } while ((n == IOStatus.INTERRUPTED) && isOpen());
   568                         while (IOStatus.okayToRetry(n) && isOpen()) {
       
   569                             park(Net.POLLOUT);
       
   570                             n = send(fd, src, isa);
       
   571                         }
       
   572                     }
   530                 }
   573                 }
   531             } finally {
   574             } finally {
   532                 endWrite(blocking, n > 0);
   575                 endWrite(blocking, n > 0);
   533                 assert IOStatus.check(n);
   576                 assert IOStatus.check(n);
   534             }
   577             }
   600         try {
   643         try {
   601             boolean blocking = isBlocking();
   644             boolean blocking = isBlocking();
   602             int n = 0;
   645             int n = 0;
   603             try {
   646             try {
   604                 beginRead(blocking, true);
   647                 beginRead(blocking, true);
   605                 do {
   648                 n = IOUtil.read(fd, buf, -1, nd);
   606                     n = IOUtil.read(fd, buf, -1, nd);
   649                 if (blocking) {
   607                 } while ((n == IOStatus.INTERRUPTED) && isOpen());
   650                     while (IOStatus.okayToRetry(n) && isOpen()) {
   608 
   651                         park(Net.POLLIN);
       
   652                         n = IOUtil.read(fd, buf, -1, nd);
       
   653                     }
       
   654                 }
   609             } finally {
   655             } finally {
   610                 endRead(blocking, n > 0);
   656                 endRead(blocking, n > 0);
   611                 assert IOStatus.check(n);
   657                 assert IOStatus.check(n);
   612             }
   658             }
   613             return IOStatus.normalize(n);
   659             return IOStatus.normalize(n);
   626         try {
   672         try {
   627             boolean blocking = isBlocking();
   673             boolean blocking = isBlocking();
   628             long n = 0;
   674             long n = 0;
   629             try {
   675             try {
   630                 beginRead(blocking, true);
   676                 beginRead(blocking, true);
   631                 do {
   677                 n = IOUtil.read(fd, dsts, offset, length, nd);
   632                     n = IOUtil.read(fd, dsts, offset, length, nd);
   678                 if (blocking) {
   633                 } while ((n == IOStatus.INTERRUPTED) && isOpen());
   679                     while (IOStatus.okayToRetry(n) && isOpen()) {
   634 
   680                         park(Net.POLLIN);
       
   681                         n = IOUtil.read(fd, dsts, offset, length, nd);
       
   682                     }
       
   683                 }
   635             } finally {
   684             } finally {
   636                 endRead(blocking, n > 0);
   685                 endRead(blocking, n > 0);
   637                 assert IOStatus.check(n);
   686                 assert IOStatus.check(n);
   638             }
   687             }
   639             return IOStatus.normalize(n);
   688             return IOStatus.normalize(n);
   657         if (blocking) {
   706         if (blocking) {
   658             // set hook for Thread.interrupt
   707             // set hook for Thread.interrupt
   659             begin();
   708             begin();
   660         }
   709         }
   661         SocketAddress remote;
   710         SocketAddress remote;
   662         synchronized (stateLock) {
   711         stateLock.lock();
       
   712         try {
   663             ensureOpen();
   713             ensureOpen();
   664             remote = remoteAddress;
   714             remote = remoteAddress;
   665             if ((remote == null) && mustBeConnected)
   715             if ((remote == null) && mustBeConnected)
   666                 throw new NotYetConnectedException();
   716                 throw new NotYetConnectedException();
   667             if (localAddress == null)
   717             if (localAddress == null)
   668                 bindInternal(null);
   718                 bindInternal(null);
   669             if (blocking)
   719             if (blocking)
   670                 writerThread = NativeThread.current();
   720                 writerThread = NativeThread.current();
       
   721         } finally {
       
   722             stateLock.unlock();
   671         }
   723         }
   672         return remote;
   724         return remote;
   673     }
   725     }
   674 
   726 
   675     /**
   727     /**
   679      */
   731      */
   680     private void endWrite(boolean blocking, boolean completed)
   732     private void endWrite(boolean blocking, boolean completed)
   681         throws AsynchronousCloseException
   733         throws AsynchronousCloseException
   682     {
   734     {
   683         if (blocking) {
   735         if (blocking) {
   684             synchronized (stateLock) {
   736             stateLock.lock();
       
   737             try {
   685                 writerThread = 0;
   738                 writerThread = 0;
   686                 // notify any thread waiting in implCloseSelectableChannel
   739                 // notify any thread waiting in implCloseSelectableChannel
   687                 if (state == ST_CLOSING) {
   740                 if (state == ST_CLOSING) {
   688                     stateLock.notifyAll();
   741                     stateCondition.signalAll();
   689                 }
   742                 }
       
   743             } finally {
       
   744                 stateLock.unlock();
   690             }
   745             }
   691             // remove hook for Thread.interrupt
   746             // remove hook for Thread.interrupt
   692             end(completed);
   747             end(completed);
   693         }
   748         }
   694     }
   749     }
   701         try {
   756         try {
   702             boolean blocking = isBlocking();
   757             boolean blocking = isBlocking();
   703             int n = 0;
   758             int n = 0;
   704             try {
   759             try {
   705                 beginWrite(blocking, true);
   760                 beginWrite(blocking, true);
   706                 do {
   761                 n = IOUtil.write(fd, buf, -1, nd);
   707                     n = IOUtil.write(fd, buf, -1, nd);
   762                 if (blocking) {
   708                 } while ((n == IOStatus.INTERRUPTED) && isOpen());
   763                     while (IOStatus.okayToRetry(n) && isOpen()) {
       
   764                         park(Net.POLLOUT);
       
   765                         n = IOUtil.write(fd, buf, -1, nd);
       
   766                     }
       
   767                 }
   709             } finally {
   768             } finally {
   710                 endWrite(blocking, n > 0);
   769                 endWrite(blocking, n > 0);
   711                 assert IOStatus.check(n);
   770                 assert IOStatus.check(n);
   712             }
   771             }
   713             return IOStatus.normalize(n);
   772             return IOStatus.normalize(n);
   726         try {
   785         try {
   727             boolean blocking = isBlocking();
   786             boolean blocking = isBlocking();
   728             long n = 0;
   787             long n = 0;
   729             try {
   788             try {
   730                 beginWrite(blocking, true);
   789                 beginWrite(blocking, true);
   731                 do {
   790                 n = IOUtil.write(fd, srcs, offset, length, nd);
   732                     n = IOUtil.write(fd, srcs, offset, length, nd);
   791                 if (blocking) {
   733                 } while ((n == IOStatus.INTERRUPTED) && isOpen());
   792                     while (IOStatus.okayToRetry(n) && isOpen()) {
       
   793                         park(Net.POLLOUT);
       
   794                         n = IOUtil.write(fd, srcs, offset, length, nd);
       
   795                     }
       
   796                 }
   734             } finally {
   797             } finally {
   735                 endWrite(blocking, n > 0);
   798                 endWrite(blocking, n > 0);
   736                 assert IOStatus.check(n);
   799                 assert IOStatus.check(n);
   737             }
   800             }
   738             return IOStatus.normalize(n);
   801             return IOStatus.normalize(n);
   745     protected void implConfigureBlocking(boolean block) throws IOException {
   808     protected void implConfigureBlocking(boolean block) throws IOException {
   746         readLock.lock();
   809         readLock.lock();
   747         try {
   810         try {
   748             writeLock.lock();
   811             writeLock.lock();
   749             try {
   812             try {
   750                 synchronized (stateLock) {
   813                 stateLock.lock();
       
   814                 try {
   751                     ensureOpen();
   815                     ensureOpen();
   752                     IOUtil.configureBlocking(fd, block);
   816                     IOUtil.configureBlocking(fd, block);
       
   817                 } finally {
       
   818                     stateLock.unlock();
   753                 }
   819                 }
   754             } finally {
   820             } finally {
   755                 writeLock.unlock();
   821                 writeLock.unlock();
   756             }
   822             }
   757         } finally {
   823         } finally {
   758             readLock.unlock();
   824             readLock.unlock();
   759         }
   825         }
   760     }
   826     }
   761 
   827 
   762     InetSocketAddress localAddress() {
   828     InetSocketAddress localAddress() {
   763         synchronized (stateLock) {
   829         stateLock.lock();
       
   830         try {
   764             return localAddress;
   831             return localAddress;
       
   832         } finally {
       
   833             stateLock.unlock();
   765         }
   834         }
   766     }
   835     }
   767 
   836 
   768     InetSocketAddress remoteAddress() {
   837     InetSocketAddress remoteAddress() {
   769         synchronized (stateLock) {
   838         stateLock.lock();
       
   839         try {
   770             return remoteAddress;
   840             return remoteAddress;
       
   841         } finally {
       
   842             stateLock.unlock();
   771         }
   843         }
   772     }
   844     }
   773 
   845 
   774     @Override
   846     @Override
   775     public DatagramChannel bind(SocketAddress local) throws IOException {
   847     public DatagramChannel bind(SocketAddress local) throws IOException {
   776         readLock.lock();
   848         readLock.lock();
   777         try {
   849         try {
   778             writeLock.lock();
   850             writeLock.lock();
   779             try {
   851             try {
   780                 synchronized (stateLock) {
   852                 stateLock.lock();
       
   853                 try {
   781                     ensureOpen();
   854                     ensureOpen();
   782                     if (localAddress != null)
   855                     if (localAddress != null)
   783                         throw new AlreadyBoundException();
   856                         throw new AlreadyBoundException();
   784                     bindInternal(local);
   857                     bindInternal(local);
       
   858                 } finally {
       
   859                     stateLock.unlock();
   785                 }
   860                 }
   786             } finally {
   861             } finally {
   787                 writeLock.unlock();
   862                 writeLock.unlock();
   788             }
   863             }
   789         } finally {
   864         } finally {
   791         }
   866         }
   792         return this;
   867         return this;
   793     }
   868     }
   794 
   869 
   795     private void bindInternal(SocketAddress local) throws IOException {
   870     private void bindInternal(SocketAddress local) throws IOException {
   796         assert Thread.holdsLock(stateLock) && (localAddress == null);
   871         assert stateLock.isHeldByCurrentThread() && (localAddress == null);
   797 
   872 
   798         InetSocketAddress isa;
   873         InetSocketAddress isa;
   799         if (local == null) {
   874         if (local == null) {
   800             // only Inet4Address allowed with IPv4 socket
   875             // only Inet4Address allowed with IPv4 socket
   801             if (family == StandardProtocolFamily.INET) {
   876             if (family == StandardProtocolFamily.INET) {
   814         localAddress = Net.localAddress(fd);
   889         localAddress = Net.localAddress(fd);
   815     }
   890     }
   816 
   891 
   817     @Override
   892     @Override
   818     public boolean isConnected() {
   893     public boolean isConnected() {
   819         synchronized (stateLock) {
   894         stateLock.lock();
       
   895         try {
   820             return (state == ST_CONNECTED);
   896             return (state == ST_CONNECTED);
       
   897         } finally {
       
   898             stateLock.unlock();
   821         }
   899         }
   822     }
   900     }
   823 
   901 
   824     @Override
   902     @Override
   825     public DatagramChannel connect(SocketAddress sa) throws IOException {
   903     public DatagramChannel connect(SocketAddress sa) throws IOException {
   837 
   915 
   838         readLock.lock();
   916         readLock.lock();
   839         try {
   917         try {
   840             writeLock.lock();
   918             writeLock.lock();
   841             try {
   919             try {
   842                 synchronized (stateLock) {
   920                 stateLock.lock();
       
   921                 try {
   843                     ensureOpen();
   922                     ensureOpen();
   844                     if (state == ST_CONNECTED)
   923                     if (state == ST_CONNECTED)
   845                         throw new AlreadyConnectedException();
   924                         throw new AlreadyConnectedException();
   846 
   925 
   847                     int n = Net.connect(family,
   926                     int n = Net.connect(family,
   863                     if (blocking) {
   942                     if (blocking) {
   864                         IOUtil.configureBlocking(fd, false);
   943                         IOUtil.configureBlocking(fd, false);
   865                     }
   944                     }
   866                     try {
   945                     try {
   867                         ByteBuffer buf = ByteBuffer.allocate(100);
   946                         ByteBuffer buf = ByteBuffer.allocate(100);
   868                         while (receive(buf) != null) {
   947                         while (receive(fd, buf, false) > 0) {
   869                             buf.clear();
   948                             buf.clear();
   870                         }
   949                         }
   871                     } finally {
   950                     } finally {
   872                         if (blocking) {
   951                         if (blocking) {
   873                             IOUtil.configureBlocking(fd, true);
   952                             IOUtil.configureBlocking(fd, true);
   874                         }
   953                         }
   875                     }
   954                     }
       
   955 
       
   956                 } finally {
       
   957                     stateLock.unlock();
   876                 }
   958                 }
   877             } finally {
   959             } finally {
   878                 writeLock.unlock();
   960                 writeLock.unlock();
   879             }
   961             }
   880         } finally {
   962         } finally {
   887     public DatagramChannel disconnect() throws IOException {
   969     public DatagramChannel disconnect() throws IOException {
   888         readLock.lock();
   970         readLock.lock();
   889         try {
   971         try {
   890             writeLock.lock();
   972             writeLock.lock();
   891             try {
   973             try {
   892                 synchronized (stateLock) {
   974                 stateLock.lock();
       
   975                 try {
   893                     if (!isOpen() || (state != ST_CONNECTED))
   976                     if (!isOpen() || (state != ST_CONNECTED))
   894                         return this;
   977                         return this;
   895 
   978 
   896                     // disconnect socket
   979                     // disconnect socket
   897                     boolean isIPv6 = (family == StandardProtocolFamily.INET6);
   980                     boolean isIPv6 = (family == StandardProtocolFamily.INET6);
   901                     remoteAddress = null;
   984                     remoteAddress = null;
   902                     state = ST_UNCONNECTED;
   985                     state = ST_UNCONNECTED;
   903 
   986 
   904                     // refresh local address
   987                     // refresh local address
   905                     localAddress = Net.localAddress(fd);
   988                     localAddress = Net.localAddress(fd);
       
   989                 } finally {
       
   990                     stateLock.unlock();
   906                 }
   991                 }
   907             } finally {
   992             } finally {
   908                 writeLock.unlock();
   993                 writeLock.unlock();
   909             }
   994             }
   910         } finally {
   995         } finally {
   948 
  1033 
   949         SecurityManager sm = System.getSecurityManager();
  1034         SecurityManager sm = System.getSecurityManager();
   950         if (sm != null)
  1035         if (sm != null)
   951             sm.checkMulticast(group);
  1036             sm.checkMulticast(group);
   952 
  1037 
   953         synchronized (stateLock) {
  1038         stateLock.lock();
       
  1039         try {
   954             ensureOpen();
  1040             ensureOpen();
   955 
  1041 
   956             // check the registry to see if we are already a member of the group
  1042             // check the registry to see if we are already a member of the group
   957             if (registry == null) {
  1043             if (registry == null) {
   958                 registry = new MembershipRegistry();
  1044                 registry = new MembershipRegistry();
  1003                                                   groupAddress, targetAddress, sourceAddress);
  1089                                                   groupAddress, targetAddress, sourceAddress);
  1004             }
  1090             }
  1005 
  1091 
  1006             registry.add(key);
  1092             registry.add(key);
  1007             return key;
  1093             return key;
       
  1094         } finally {
       
  1095             stateLock.unlock();
  1008         }
  1096         }
  1009     }
  1097     }
  1010 
  1098 
  1011     @Override
  1099     @Override
  1012     public MembershipKey join(InetAddress group,
  1100     public MembershipKey join(InetAddress group,
  1028 
  1116 
  1029     // package-private
  1117     // package-private
  1030     void drop(MembershipKeyImpl key) {
  1118     void drop(MembershipKeyImpl key) {
  1031         assert key.channel() == this;
  1119         assert key.channel() == this;
  1032 
  1120 
  1033         synchronized (stateLock) {
  1121         stateLock.lock();
       
  1122         try {
  1034             if (!key.isValid())
  1123             if (!key.isValid())
  1035                 return;
  1124                 return;
  1036 
  1125 
  1037             try {
  1126             try {
  1038                 if (key instanceof MembershipKeyImpl.Type6) {
  1127                 if (key instanceof MembershipKeyImpl.Type6) {
  1049                 throw new AssertionError(ioe);
  1138                 throw new AssertionError(ioe);
  1050             }
  1139             }
  1051 
  1140 
  1052             key.invalidate();
  1141             key.invalidate();
  1053             registry.remove(key);
  1142             registry.remove(key);
       
  1143         } finally {
       
  1144             stateLock.unlock();
  1054         }
  1145         }
  1055     }
  1146     }
  1056 
  1147 
  1057     /**
  1148     /**
  1058      * Block datagrams from given source if a memory to receive all
  1149      * Block datagrams from given source if a memory to receive all
  1062         throws IOException
  1153         throws IOException
  1063     {
  1154     {
  1064         assert key.channel() == this;
  1155         assert key.channel() == this;
  1065         assert key.sourceAddress() == null;
  1156         assert key.sourceAddress() == null;
  1066 
  1157 
  1067         synchronized (stateLock) {
  1158         stateLock.lock();
       
  1159         try {
  1068             if (!key.isValid())
  1160             if (!key.isValid())
  1069                 throw new IllegalStateException("key is no longer valid");
  1161                 throw new IllegalStateException("key is no longer valid");
  1070             if (source.isAnyLocalAddress())
  1162             if (source.isAnyLocalAddress())
  1071                 throw new IllegalArgumentException("Source address is a wildcard address");
  1163                 throw new IllegalArgumentException("Source address is a wildcard address");
  1072             if (source.isMulticastAddress())
  1164             if (source.isMulticastAddress())
  1088             }
  1180             }
  1089             if (n == IOStatus.UNAVAILABLE) {
  1181             if (n == IOStatus.UNAVAILABLE) {
  1090                 // ancient kernel
  1182                 // ancient kernel
  1091                 throw new UnsupportedOperationException();
  1183                 throw new UnsupportedOperationException();
  1092             }
  1184             }
       
  1185         } finally {
       
  1186             stateLock.unlock();
  1093         }
  1187         }
  1094     }
  1188     }
  1095 
  1189 
  1096     /**
  1190     /**
  1097      * Unblock given source.
  1191      * Unblock given source.
  1098      */
  1192      */
  1099     void unblock(MembershipKeyImpl key, InetAddress source) {
  1193     void unblock(MembershipKeyImpl key, InetAddress source) {
  1100         assert key.channel() == this;
  1194         assert key.channel() == this;
  1101         assert key.sourceAddress() == null;
  1195         assert key.sourceAddress() == null;
  1102 
  1196 
  1103         synchronized (stateLock) {
  1197         stateLock.lock();
       
  1198         try {
  1104             if (!key.isValid())
  1199             if (!key.isValid())
  1105                 throw new IllegalStateException("key is no longer valid");
  1200                 throw new IllegalStateException("key is no longer valid");
  1106 
  1201 
  1107             try {
  1202             try {
  1108                 if (key instanceof MembershipKeyImpl.Type6) {
  1203                 if (key instanceof MembershipKeyImpl.Type6) {
  1118                 }
  1213                 }
  1119             } catch (IOException ioe) {
  1214             } catch (IOException ioe) {
  1120                 // should not happen
  1215                 // should not happen
  1121                 throw new AssertionError(ioe);
  1216                 throw new AssertionError(ioe);
  1122             }
  1217             }
       
  1218         } finally {
       
  1219             stateLock.unlock();
  1123         }
  1220         }
  1124     }
  1221     }
  1125 
  1222 
  1126     /**
  1223     /**
  1127      * Invoked by implCloseChannel to close the channel.
  1224      * Invoked by implCloseChannel to close the channel.
  1142 
  1239 
  1143         boolean blocking;
  1240         boolean blocking;
  1144         boolean interrupted = false;
  1241         boolean interrupted = false;
  1145 
  1242 
  1146         // set state to ST_CLOSING and invalid membership keys
  1243         // set state to ST_CLOSING and invalid membership keys
  1147         synchronized (stateLock) {
  1244         stateLock.lock();
       
  1245         try {
  1148             assert state < ST_CLOSING;
  1246             assert state < ST_CLOSING;
  1149             blocking = isBlocking();
  1247             blocking = isBlocking();
  1150             state = ST_CLOSING;
  1248             state = ST_CLOSING;
  1151 
  1249 
  1152             // if member of any multicast groups then invalidate the keys
  1250             // if member of any multicast groups then invalidate the keys
  1153             if (registry != null)
  1251             if (registry != null)
  1154                 registry.invalidateAll();
  1252                 registry.invalidateAll();
       
  1253         } finally {
       
  1254             stateLock.unlock();
  1155         }
  1255         }
  1156 
  1256 
  1157         // wait for any outstanding I/O operations to complete
  1257         // wait for any outstanding I/O operations to complete
  1158         if (blocking) {
  1258         if (blocking) {
  1159             synchronized (stateLock) {
  1259             stateLock.lock();
       
  1260             try {
  1160                 assert state == ST_CLOSING;
  1261                 assert state == ST_CLOSING;
  1161                 long reader = readerThread;
  1262                 long reader = readerThread;
  1162                 long writer = writerThread;
  1263                 long writer = writerThread;
  1163                 if (reader != 0 || writer != 0) {
  1264                 if (reader != 0 || writer != 0) {
  1164                     nd.preClose(fd);
  1265                     nd.preClose(fd);
  1169                         NativeThread.signal(writer);
  1270                         NativeThread.signal(writer);
  1170 
  1271 
  1171                     // wait for blocking I/O operations to end
  1272                     // wait for blocking I/O operations to end
  1172                     while (readerThread != 0 || writerThread != 0) {
  1273                     while (readerThread != 0 || writerThread != 0) {
  1173                         try {
  1274                         try {
  1174                             stateLock.wait();
  1275                             stateCondition.await();
  1175                         } catch (InterruptedException e) {
  1276                         } catch (InterruptedException e) {
  1176                             interrupted = true;
  1277                             interrupted = true;
  1177                         }
  1278                         }
  1178                     }
  1279                     }
  1179                 }
  1280                 }
       
  1281             } finally {
       
  1282                 stateLock.unlock();
  1180             }
  1283             }
  1181         } else {
  1284         } else {
  1182             // non-blocking mode: wait for read/write to complete
  1285             // non-blocking mode: wait for read/write to complete
  1183             readLock.lock();
  1286             readLock.lock();
  1184             try {
  1287             try {
  1188                 readLock.unlock();
  1291                 readLock.unlock();
  1189             }
  1292             }
  1190         }
  1293         }
  1191 
  1294 
  1192         // set state to ST_KILLPENDING
  1295         // set state to ST_KILLPENDING
  1193         synchronized (stateLock) {
  1296         stateLock.lock();
       
  1297         try {
  1194             assert state == ST_CLOSING;
  1298             assert state == ST_CLOSING;
  1195             state = ST_KILLPENDING;
  1299             state = ST_KILLPENDING;
       
  1300         } finally {
       
  1301             stateLock.unlock();
  1196         }
  1302         }
  1197 
  1303 
  1198         // close socket if not registered with Selector
  1304         // close socket if not registered with Selector
  1199         if (!isRegistered())
  1305         if (!isRegistered())
  1200             kill();
  1306             kill();
  1204             Thread.currentThread().interrupt();
  1310             Thread.currentThread().interrupt();
  1205     }
  1311     }
  1206 
  1312 
  1207     @Override
  1313     @Override
  1208     public void kill() throws IOException {
  1314     public void kill() throws IOException {
  1209         synchronized (stateLock) {
  1315         stateLock.lock();
       
  1316         try {
  1210             if (state == ST_KILLPENDING) {
  1317             if (state == ST_KILLPENDING) {
  1211                 state = ST_KILLED;
  1318                 state = ST_KILLED;
  1212                 try {
  1319                 try {
  1213                     nd.close(fd);
  1320                     nd.close(fd);
  1214                 } finally {
  1321                 } finally {
  1215                     // notify resource manager
  1322                     // notify resource manager
  1216                     ResourceManager.afterUdpClose();
  1323                     ResourceManager.afterUdpClose();
  1217                 }
  1324                 }
  1218             }
  1325             }
       
  1326         } finally {
       
  1327             stateLock.unlock();
  1219         }
  1328         }
  1220     }
  1329     }
  1221 
  1330 
  1222     @SuppressWarnings("deprecation")
  1331     @SuppressWarnings("deprecation")
  1223     protected void finalize() throws IOException {
  1332     protected void finalize() throws IOException {