51 import java.nio.channels.spi.SelectorProvider; |
51 import java.nio.channels.spi.SelectorProvider; |
52 import java.util.Collections; |
52 import java.util.Collections; |
53 import java.util.HashSet; |
53 import java.util.HashSet; |
54 import java.util.Objects; |
54 import java.util.Objects; |
55 import java.util.Set; |
55 import java.util.Set; |
|
56 import java.util.concurrent.locks.Condition; |
56 import java.util.concurrent.locks.ReentrantLock; |
57 import java.util.concurrent.locks.ReentrantLock; |
57 |
58 |
58 import sun.net.ResourceManager; |
59 import sun.net.ResourceManager; |
59 import sun.net.ext.ExtendedSocketOptions; |
60 import sun.net.ext.ExtendedSocketOptions; |
60 |
61 |
87 // Lock held by current writing or connecting thread |
88 // Lock held by current writing or connecting thread |
88 private final ReentrantLock writeLock = new ReentrantLock(); |
89 private final ReentrantLock writeLock = new ReentrantLock(); |
89 |
90 |
90 // Lock held by any thread that modifies the state fields declared below |
91 // Lock held by any thread that modifies the state fields declared below |
91 // DO NOT invoke a blocking I/O operation while holding this lock! |
92 // DO NOT invoke a blocking I/O operation while holding this lock! |
92 private final Object stateLock = new Object(); |
93 private final ReentrantLock stateLock = new ReentrantLock(); |
|
94 private final Condition stateCondition = stateLock.newCondition(); |
93 |
95 |
94 // -- The following fields are protected by stateLock |
96 // -- The following fields are protected by stateLock |
95 |
97 |
96 // State (does not necessarily increase monotonically) |
98 // State (does not necessarily increase monotonically) |
97 private static final int ST_UNCONNECTED = 0; |
99 private static final int ST_UNCONNECTED = 0; |
177 this.family = Net.isIPv6Available() |
179 this.family = Net.isIPv6Available() |
178 ? StandardProtocolFamily.INET6 |
180 ? StandardProtocolFamily.INET6 |
179 : StandardProtocolFamily.INET; |
181 : StandardProtocolFamily.INET; |
180 this.fd = fd; |
182 this.fd = fd; |
181 this.fdVal = IOUtil.fdVal(fd); |
183 this.fdVal = IOUtil.fdVal(fd); |
182 synchronized (stateLock) { |
184 stateLock.lock(); |
|
185 try { |
183 this.localAddress = Net.localAddress(fd); |
186 this.localAddress = Net.localAddress(fd); |
|
187 } finally { |
|
188 stateLock.unlock(); |
184 } |
189 } |
185 } |
190 } |
186 |
191 |
187 // @throws ClosedChannelException if channel is closed |
192 // @throws ClosedChannelException if channel is closed |
188 private void ensureOpen() throws ClosedChannelException { |
193 private void ensureOpen() throws ClosedChannelException { |
190 throw new ClosedChannelException(); |
195 throw new ClosedChannelException(); |
191 } |
196 } |
192 |
197 |
193 @Override |
198 @Override |
194 public DatagramSocket socket() { |
199 public DatagramSocket socket() { |
195 synchronized (stateLock) { |
200 stateLock.lock(); |
|
201 try { |
196 if (socket == null) |
202 if (socket == null) |
197 socket = DatagramSocketAdaptor.create(this); |
203 socket = DatagramSocketAdaptor.create(this); |
198 return socket; |
204 return socket; |
|
205 } finally { |
|
206 stateLock.unlock(); |
199 } |
207 } |
200 } |
208 } |
201 |
209 |
202 @Override |
210 @Override |
203 public SocketAddress getLocalAddress() throws IOException { |
211 public SocketAddress getLocalAddress() throws IOException { |
204 synchronized (stateLock) { |
212 stateLock.lock(); |
|
213 try { |
205 ensureOpen(); |
214 ensureOpen(); |
206 // Perform security check before returning address |
215 // Perform security check before returning address |
207 return Net.getRevealedLocalAddress(localAddress); |
216 return Net.getRevealedLocalAddress(localAddress); |
|
217 } finally { |
|
218 stateLock.unlock(); |
208 } |
219 } |
209 } |
220 } |
210 |
221 |
211 @Override |
222 @Override |
212 public SocketAddress getRemoteAddress() throws IOException { |
223 public SocketAddress getRemoteAddress() throws IOException { |
213 synchronized (stateLock) { |
224 stateLock.lock(); |
|
225 try { |
214 ensureOpen(); |
226 ensureOpen(); |
215 return remoteAddress; |
227 return remoteAddress; |
|
228 } finally { |
|
229 stateLock.unlock(); |
216 } |
230 } |
217 } |
231 } |
218 |
232 |
219 @Override |
233 @Override |
220 public <T> DatagramChannel setOption(SocketOption<T> name, T value) |
234 public <T> DatagramChannel setOption(SocketOption<T> name, T value) |
222 { |
236 { |
223 Objects.requireNonNull(name); |
237 Objects.requireNonNull(name); |
224 if (!supportedOptions().contains(name)) |
238 if (!supportedOptions().contains(name)) |
225 throw new UnsupportedOperationException("'" + name + "' not supported"); |
239 throw new UnsupportedOperationException("'" + name + "' not supported"); |
226 |
240 |
227 synchronized (stateLock) { |
241 stateLock.lock(); |
|
242 try { |
228 ensureOpen(); |
243 ensureOpen(); |
229 |
244 |
230 if (name == StandardSocketOptions.IP_TOS || |
245 if (name == StandardSocketOptions.IP_TOS || |
231 name == StandardSocketOptions.IP_MULTICAST_TTL || |
246 name == StandardSocketOptions.IP_MULTICAST_TTL || |
232 name == StandardSocketOptions.IP_MULTICAST_LOOP) |
247 name == StandardSocketOptions.IP_MULTICAST_LOOP) |
274 { |
291 { |
275 Objects.requireNonNull(name); |
292 Objects.requireNonNull(name); |
276 if (!supportedOptions().contains(name)) |
293 if (!supportedOptions().contains(name)) |
277 throw new UnsupportedOperationException("'" + name + "' not supported"); |
294 throw new UnsupportedOperationException("'" + name + "' not supported"); |
278 |
295 |
279 synchronized (stateLock) { |
296 stateLock.lock(); |
|
297 try { |
280 ensureOpen(); |
298 ensureOpen(); |
281 |
299 |
282 if (name == StandardSocketOptions.IP_TOS || |
300 if (name == StandardSocketOptions.IP_TOS || |
283 name == StandardSocketOptions.IP_MULTICAST_TTL || |
301 name == StandardSocketOptions.IP_MULTICAST_TTL || |
284 name == StandardSocketOptions.IP_MULTICAST_LOOP) |
302 name == StandardSocketOptions.IP_MULTICAST_LOOP) |
313 return (T)Boolean.valueOf(isReuseAddress); |
331 return (T)Boolean.valueOf(isReuseAddress); |
314 } |
332 } |
315 |
333 |
316 // no special handling |
334 // no special handling |
317 return (T) Net.getSocketOption(fd, Net.UNSPEC, name); |
335 return (T) Net.getSocketOption(fd, Net.UNSPEC, name); |
|
336 } finally { |
|
337 stateLock.unlock(); |
318 } |
338 } |
319 } |
339 } |
320 |
340 |
321 private static class DefaultOptionsHolder { |
341 private static class DefaultOptionsHolder { |
322 static final Set<SocketOption<?>> defaultOptions = defaultOptions(); |
342 static final Set<SocketOption<?>> defaultOptions = defaultOptions(); |
360 if (blocking) { |
380 if (blocking) { |
361 // set hook for Thread.interrupt |
381 // set hook for Thread.interrupt |
362 begin(); |
382 begin(); |
363 } |
383 } |
364 SocketAddress remote; |
384 SocketAddress remote; |
365 synchronized (stateLock) { |
385 stateLock.lock(); |
|
386 try { |
366 ensureOpen(); |
387 ensureOpen(); |
367 remote = remoteAddress; |
388 remote = remoteAddress; |
368 if ((remote == null) && mustBeConnected) |
389 if ((remote == null) && mustBeConnected) |
369 throw new NotYetConnectedException(); |
390 throw new NotYetConnectedException(); |
370 if (localAddress == null) |
391 if (localAddress == null) |
371 bindInternal(null); |
392 bindInternal(null); |
372 if (blocking) |
393 if (blocking) |
373 readerThread = NativeThread.current(); |
394 readerThread = NativeThread.current(); |
|
395 } finally { |
|
396 stateLock.unlock(); |
374 } |
397 } |
375 return remote; |
398 return remote; |
376 } |
399 } |
377 |
400 |
378 /** |
401 /** |
382 */ |
405 */ |
383 private void endRead(boolean blocking, boolean completed) |
406 private void endRead(boolean blocking, boolean completed) |
384 throws AsynchronousCloseException |
407 throws AsynchronousCloseException |
385 { |
408 { |
386 if (blocking) { |
409 if (blocking) { |
387 synchronized (stateLock) { |
410 stateLock.lock(); |
|
411 try { |
388 readerThread = 0; |
412 readerThread = 0; |
389 // notify any thread waiting in implCloseSelectableChannel |
413 // notify any thread waiting in implCloseSelectableChannel |
390 if (state == ST_CLOSING) { |
414 if (state == ST_CLOSING) { |
391 stateLock.notifyAll(); |
415 stateCondition.signalAll(); |
392 } |
416 } |
|
417 } finally { |
|
418 stateLock.unlock(); |
393 } |
419 } |
394 // remove hook for Thread.interrupt |
420 // remove hook for Thread.interrupt |
395 end(completed); |
421 end(completed); |
396 } |
422 } |
397 } |
423 } |
412 SocketAddress remote = beginRead(blocking, false); |
438 SocketAddress remote = beginRead(blocking, false); |
413 boolean connected = (remote != null); |
439 boolean connected = (remote != null); |
414 SecurityManager sm = System.getSecurityManager(); |
440 SecurityManager sm = System.getSecurityManager(); |
415 if (connected || (sm == null)) { |
441 if (connected || (sm == null)) { |
416 // connected or no security manager |
442 // connected or no security manager |
417 do { |
443 n = receive(fd, dst, connected); |
418 n = receive(fd, dst, connected); |
444 if (blocking) { |
419 } while ((n == IOStatus.INTERRUPTED) && isOpen()); |
445 while (IOStatus.okayToRetry(n) && isOpen()) { |
420 if (n == IOStatus.UNAVAILABLE) |
446 park(Net.POLLIN); |
|
447 n = receive(fd, dst, connected); |
|
448 } |
|
449 } else if (n == IOStatus.UNAVAILABLE) { |
421 return null; |
450 return null; |
|
451 } |
422 } else { |
452 } else { |
423 // Cannot receive into user's buffer when running with a |
453 // Cannot receive into user's buffer when running with a |
424 // security manager and not connected |
454 // security manager and not connected |
425 bb = Util.getTemporaryDirectBuffer(dst.remaining()); |
455 bb = Util.getTemporaryDirectBuffer(dst.remaining()); |
426 for (;;) { |
456 for (;;) { |
427 do { |
457 n = receive(fd, bb, connected); |
428 n = receive(fd, bb, connected); |
458 if (blocking) { |
429 } while ((n == IOStatus.INTERRUPTED) && isOpen()); |
459 while (IOStatus.okayToRetry(n) && isOpen()) { |
430 if (n == IOStatus.UNAVAILABLE) |
460 park(Net.POLLIN); |
|
461 n = receive(fd, bb, connected); |
|
462 } |
|
463 } else if (n == IOStatus.UNAVAILABLE) { |
431 return null; |
464 return null; |
|
465 } |
432 InetSocketAddress isa = (InetSocketAddress)sender; |
466 InetSocketAddress isa = (InetSocketAddress)sender; |
433 try { |
467 try { |
434 sm.checkAccept(isa.getAddress().getHostAddress(), |
468 sm.checkAccept(isa.getAddress().getHostAddress(), |
435 isa.getPort()); |
469 isa.getPort()); |
436 } catch (SecurityException se) { |
470 } catch (SecurityException se) { |
491 if (n > 0) |
525 if (n > 0) |
492 bb.position(pos + n); |
526 bb.position(pos + n); |
493 return n; |
527 return n; |
494 } |
528 } |
495 |
529 |
|
530 @Override |
496 public int send(ByteBuffer src, SocketAddress target) |
531 public int send(ByteBuffer src, SocketAddress target) |
497 throws IOException |
532 throws IOException |
498 { |
533 { |
499 Objects.requireNonNull(src); |
534 Objects.requireNonNull(src); |
500 InetSocketAddress isa = Net.checkAddress(target, family); |
535 InetSocketAddress isa = Net.checkAddress(target, family); |
508 if (remote != null) { |
543 if (remote != null) { |
509 // connected |
544 // connected |
510 if (!target.equals(remote)) { |
545 if (!target.equals(remote)) { |
511 throw new AlreadyConnectedException(); |
546 throw new AlreadyConnectedException(); |
512 } |
547 } |
513 do { |
548 n = IOUtil.write(fd, src, -1, nd); |
514 n = IOUtil.write(fd, src, -1, nd); |
549 if (blocking) { |
515 } while ((n == IOStatus.INTERRUPTED) && isOpen()); |
550 while (IOStatus.okayToRetry(n) && isOpen()) { |
|
551 park(Net.POLLOUT); |
|
552 n = IOUtil.write(fd, src, -1, nd); |
|
553 } |
|
554 } |
516 } else { |
555 } else { |
517 // not connected |
556 // not connected |
518 SecurityManager sm = System.getSecurityManager(); |
557 SecurityManager sm = System.getSecurityManager(); |
519 if (sm != null) { |
558 if (sm != null) { |
520 InetAddress ia = isa.getAddress(); |
559 InetAddress ia = isa.getAddress(); |
522 sm.checkMulticast(ia); |
561 sm.checkMulticast(ia); |
523 } else { |
562 } else { |
524 sm.checkConnect(ia.getHostAddress(), isa.getPort()); |
563 sm.checkConnect(ia.getHostAddress(), isa.getPort()); |
525 } |
564 } |
526 } |
565 } |
527 do { |
566 n = send(fd, src, isa); |
528 n = send(fd, src, isa); |
567 if (blocking) { |
529 } while ((n == IOStatus.INTERRUPTED) && isOpen()); |
568 while (IOStatus.okayToRetry(n) && isOpen()) { |
|
569 park(Net.POLLOUT); |
|
570 n = send(fd, src, isa); |
|
571 } |
|
572 } |
530 } |
573 } |
531 } finally { |
574 } finally { |
532 endWrite(blocking, n > 0); |
575 endWrite(blocking, n > 0); |
533 assert IOStatus.check(n); |
576 assert IOStatus.check(n); |
534 } |
577 } |
600 try { |
643 try { |
601 boolean blocking = isBlocking(); |
644 boolean blocking = isBlocking(); |
602 int n = 0; |
645 int n = 0; |
603 try { |
646 try { |
604 beginRead(blocking, true); |
647 beginRead(blocking, true); |
605 do { |
648 n = IOUtil.read(fd, buf, -1, nd); |
606 n = IOUtil.read(fd, buf, -1, nd); |
649 if (blocking) { |
607 } while ((n == IOStatus.INTERRUPTED) && isOpen()); |
650 while (IOStatus.okayToRetry(n) && isOpen()) { |
608 |
651 park(Net.POLLIN); |
|
652 n = IOUtil.read(fd, buf, -1, nd); |
|
653 } |
|
654 } |
609 } finally { |
655 } finally { |
610 endRead(blocking, n > 0); |
656 endRead(blocking, n > 0); |
611 assert IOStatus.check(n); |
657 assert IOStatus.check(n); |
612 } |
658 } |
613 return IOStatus.normalize(n); |
659 return IOStatus.normalize(n); |
626 try { |
672 try { |
627 boolean blocking = isBlocking(); |
673 boolean blocking = isBlocking(); |
628 long n = 0; |
674 long n = 0; |
629 try { |
675 try { |
630 beginRead(blocking, true); |
676 beginRead(blocking, true); |
631 do { |
677 n = IOUtil.read(fd, dsts, offset, length, nd); |
632 n = IOUtil.read(fd, dsts, offset, length, nd); |
678 if (blocking) { |
633 } while ((n == IOStatus.INTERRUPTED) && isOpen()); |
679 while (IOStatus.okayToRetry(n) && isOpen()) { |
634 |
680 park(Net.POLLIN); |
|
681 n = IOUtil.read(fd, dsts, offset, length, nd); |
|
682 } |
|
683 } |
635 } finally { |
684 } finally { |
636 endRead(blocking, n > 0); |
685 endRead(blocking, n > 0); |
637 assert IOStatus.check(n); |
686 assert IOStatus.check(n); |
638 } |
687 } |
639 return IOStatus.normalize(n); |
688 return IOStatus.normalize(n); |
657 if (blocking) { |
706 if (blocking) { |
658 // set hook for Thread.interrupt |
707 // set hook for Thread.interrupt |
659 begin(); |
708 begin(); |
660 } |
709 } |
661 SocketAddress remote; |
710 SocketAddress remote; |
662 synchronized (stateLock) { |
711 stateLock.lock(); |
|
712 try { |
663 ensureOpen(); |
713 ensureOpen(); |
664 remote = remoteAddress; |
714 remote = remoteAddress; |
665 if ((remote == null) && mustBeConnected) |
715 if ((remote == null) && mustBeConnected) |
666 throw new NotYetConnectedException(); |
716 throw new NotYetConnectedException(); |
667 if (localAddress == null) |
717 if (localAddress == null) |
668 bindInternal(null); |
718 bindInternal(null); |
669 if (blocking) |
719 if (blocking) |
670 writerThread = NativeThread.current(); |
720 writerThread = NativeThread.current(); |
|
721 } finally { |
|
722 stateLock.unlock(); |
671 } |
723 } |
672 return remote; |
724 return remote; |
673 } |
725 } |
674 |
726 |
675 /** |
727 /** |
679 */ |
731 */ |
680 private void endWrite(boolean blocking, boolean completed) |
732 private void endWrite(boolean blocking, boolean completed) |
681 throws AsynchronousCloseException |
733 throws AsynchronousCloseException |
682 { |
734 { |
683 if (blocking) { |
735 if (blocking) { |
684 synchronized (stateLock) { |
736 stateLock.lock(); |
|
737 try { |
685 writerThread = 0; |
738 writerThread = 0; |
686 // notify any thread waiting in implCloseSelectableChannel |
739 // notify any thread waiting in implCloseSelectableChannel |
687 if (state == ST_CLOSING) { |
740 if (state == ST_CLOSING) { |
688 stateLock.notifyAll(); |
741 stateCondition.signalAll(); |
689 } |
742 } |
|
743 } finally { |
|
744 stateLock.unlock(); |
690 } |
745 } |
691 // remove hook for Thread.interrupt |
746 // remove hook for Thread.interrupt |
692 end(completed); |
747 end(completed); |
693 } |
748 } |
694 } |
749 } |
701 try { |
756 try { |
702 boolean blocking = isBlocking(); |
757 boolean blocking = isBlocking(); |
703 int n = 0; |
758 int n = 0; |
704 try { |
759 try { |
705 beginWrite(blocking, true); |
760 beginWrite(blocking, true); |
706 do { |
761 n = IOUtil.write(fd, buf, -1, nd); |
707 n = IOUtil.write(fd, buf, -1, nd); |
762 if (blocking) { |
708 } while ((n == IOStatus.INTERRUPTED) && isOpen()); |
763 while (IOStatus.okayToRetry(n) && isOpen()) { |
|
764 park(Net.POLLOUT); |
|
765 n = IOUtil.write(fd, buf, -1, nd); |
|
766 } |
|
767 } |
709 } finally { |
768 } finally { |
710 endWrite(blocking, n > 0); |
769 endWrite(blocking, n > 0); |
711 assert IOStatus.check(n); |
770 assert IOStatus.check(n); |
712 } |
771 } |
713 return IOStatus.normalize(n); |
772 return IOStatus.normalize(n); |
726 try { |
785 try { |
727 boolean blocking = isBlocking(); |
786 boolean blocking = isBlocking(); |
728 long n = 0; |
787 long n = 0; |
729 try { |
788 try { |
730 beginWrite(blocking, true); |
789 beginWrite(blocking, true); |
731 do { |
790 n = IOUtil.write(fd, srcs, offset, length, nd); |
732 n = IOUtil.write(fd, srcs, offset, length, nd); |
791 if (blocking) { |
733 } while ((n == IOStatus.INTERRUPTED) && isOpen()); |
792 while (IOStatus.okayToRetry(n) && isOpen()) { |
|
793 park(Net.POLLOUT); |
|
794 n = IOUtil.write(fd, srcs, offset, length, nd); |
|
795 } |
|
796 } |
734 } finally { |
797 } finally { |
735 endWrite(blocking, n > 0); |
798 endWrite(blocking, n > 0); |
736 assert IOStatus.check(n); |
799 assert IOStatus.check(n); |
737 } |
800 } |
738 return IOStatus.normalize(n); |
801 return IOStatus.normalize(n); |
745 protected void implConfigureBlocking(boolean block) throws IOException { |
808 protected void implConfigureBlocking(boolean block) throws IOException { |
746 readLock.lock(); |
809 readLock.lock(); |
747 try { |
810 try { |
748 writeLock.lock(); |
811 writeLock.lock(); |
749 try { |
812 try { |
750 synchronized (stateLock) { |
813 stateLock.lock(); |
|
814 try { |
751 ensureOpen(); |
815 ensureOpen(); |
752 IOUtil.configureBlocking(fd, block); |
816 IOUtil.configureBlocking(fd, block); |
|
817 } finally { |
|
818 stateLock.unlock(); |
753 } |
819 } |
754 } finally { |
820 } finally { |
755 writeLock.unlock(); |
821 writeLock.unlock(); |
756 } |
822 } |
757 } finally { |
823 } finally { |
758 readLock.unlock(); |
824 readLock.unlock(); |
759 } |
825 } |
760 } |
826 } |
761 |
827 |
762 InetSocketAddress localAddress() { |
828 InetSocketAddress localAddress() { |
763 synchronized (stateLock) { |
829 stateLock.lock(); |
|
830 try { |
764 return localAddress; |
831 return localAddress; |
|
832 } finally { |
|
833 stateLock.unlock(); |
765 } |
834 } |
766 } |
835 } |
767 |
836 |
768 InetSocketAddress remoteAddress() { |
837 InetSocketAddress remoteAddress() { |
769 synchronized (stateLock) { |
838 stateLock.lock(); |
|
839 try { |
770 return remoteAddress; |
840 return remoteAddress; |
|
841 } finally { |
|
842 stateLock.unlock(); |
771 } |
843 } |
772 } |
844 } |
773 |
845 |
774 @Override |
846 @Override |
775 public DatagramChannel bind(SocketAddress local) throws IOException { |
847 public DatagramChannel bind(SocketAddress local) throws IOException { |
776 readLock.lock(); |
848 readLock.lock(); |
777 try { |
849 try { |
778 writeLock.lock(); |
850 writeLock.lock(); |
779 try { |
851 try { |
780 synchronized (stateLock) { |
852 stateLock.lock(); |
|
853 try { |
781 ensureOpen(); |
854 ensureOpen(); |
782 if (localAddress != null) |
855 if (localAddress != null) |
783 throw new AlreadyBoundException(); |
856 throw new AlreadyBoundException(); |
784 bindInternal(local); |
857 bindInternal(local); |
|
858 } finally { |
|
859 stateLock.unlock(); |
785 } |
860 } |
786 } finally { |
861 } finally { |
787 writeLock.unlock(); |
862 writeLock.unlock(); |
788 } |
863 } |
789 } finally { |
864 } finally { |
791 } |
866 } |
792 return this; |
867 return this; |
793 } |
868 } |
794 |
869 |
795 private void bindInternal(SocketAddress local) throws IOException { |
870 private void bindInternal(SocketAddress local) throws IOException { |
796 assert Thread.holdsLock(stateLock) && (localAddress == null); |
871 assert stateLock.isHeldByCurrentThread() && (localAddress == null); |
797 |
872 |
798 InetSocketAddress isa; |
873 InetSocketAddress isa; |
799 if (local == null) { |
874 if (local == null) { |
800 // only Inet4Address allowed with IPv4 socket |
875 // only Inet4Address allowed with IPv4 socket |
801 if (family == StandardProtocolFamily.INET) { |
876 if (family == StandardProtocolFamily.INET) { |
863 if (blocking) { |
942 if (blocking) { |
864 IOUtil.configureBlocking(fd, false); |
943 IOUtil.configureBlocking(fd, false); |
865 } |
944 } |
866 try { |
945 try { |
867 ByteBuffer buf = ByteBuffer.allocate(100); |
946 ByteBuffer buf = ByteBuffer.allocate(100); |
868 while (receive(buf) != null) { |
947 while (receive(fd, buf, false) > 0) { |
869 buf.clear(); |
948 buf.clear(); |
870 } |
949 } |
871 } finally { |
950 } finally { |
872 if (blocking) { |
951 if (blocking) { |
873 IOUtil.configureBlocking(fd, true); |
952 IOUtil.configureBlocking(fd, true); |
874 } |
953 } |
875 } |
954 } |
|
955 |
|
956 } finally { |
|
957 stateLock.unlock(); |
876 } |
958 } |
877 } finally { |
959 } finally { |
878 writeLock.unlock(); |
960 writeLock.unlock(); |
879 } |
961 } |
880 } finally { |
962 } finally { |
948 |
1033 |
949 SecurityManager sm = System.getSecurityManager(); |
1034 SecurityManager sm = System.getSecurityManager(); |
950 if (sm != null) |
1035 if (sm != null) |
951 sm.checkMulticast(group); |
1036 sm.checkMulticast(group); |
952 |
1037 |
953 synchronized (stateLock) { |
1038 stateLock.lock(); |
|
1039 try { |
954 ensureOpen(); |
1040 ensureOpen(); |
955 |
1041 |
956 // check the registry to see if we are already a member of the group |
1042 // check the registry to see if we are already a member of the group |
957 if (registry == null) { |
1043 if (registry == null) { |
958 registry = new MembershipRegistry(); |
1044 registry = new MembershipRegistry(); |
1062 throws IOException |
1153 throws IOException |
1063 { |
1154 { |
1064 assert key.channel() == this; |
1155 assert key.channel() == this; |
1065 assert key.sourceAddress() == null; |
1156 assert key.sourceAddress() == null; |
1066 |
1157 |
1067 synchronized (stateLock) { |
1158 stateLock.lock(); |
|
1159 try { |
1068 if (!key.isValid()) |
1160 if (!key.isValid()) |
1069 throw new IllegalStateException("key is no longer valid"); |
1161 throw new IllegalStateException("key is no longer valid"); |
1070 if (source.isAnyLocalAddress()) |
1162 if (source.isAnyLocalAddress()) |
1071 throw new IllegalArgumentException("Source address is a wildcard address"); |
1163 throw new IllegalArgumentException("Source address is a wildcard address"); |
1072 if (source.isMulticastAddress()) |
1164 if (source.isMulticastAddress()) |
1088 } |
1180 } |
1089 if (n == IOStatus.UNAVAILABLE) { |
1181 if (n == IOStatus.UNAVAILABLE) { |
1090 // ancient kernel |
1182 // ancient kernel |
1091 throw new UnsupportedOperationException(); |
1183 throw new UnsupportedOperationException(); |
1092 } |
1184 } |
|
1185 } finally { |
|
1186 stateLock.unlock(); |
1093 } |
1187 } |
1094 } |
1188 } |
1095 |
1189 |
1096 /** |
1190 /** |
1097 * Unblock given source. |
1191 * Unblock given source. |
1098 */ |
1192 */ |
1099 void unblock(MembershipKeyImpl key, InetAddress source) { |
1193 void unblock(MembershipKeyImpl key, InetAddress source) { |
1100 assert key.channel() == this; |
1194 assert key.channel() == this; |
1101 assert key.sourceAddress() == null; |
1195 assert key.sourceAddress() == null; |
1102 |
1196 |
1103 synchronized (stateLock) { |
1197 stateLock.lock(); |
|
1198 try { |
1104 if (!key.isValid()) |
1199 if (!key.isValid()) |
1105 throw new IllegalStateException("key is no longer valid"); |
1200 throw new IllegalStateException("key is no longer valid"); |
1106 |
1201 |
1107 try { |
1202 try { |
1108 if (key instanceof MembershipKeyImpl.Type6) { |
1203 if (key instanceof MembershipKeyImpl.Type6) { |
1142 |
1239 |
1143 boolean blocking; |
1240 boolean blocking; |
1144 boolean interrupted = false; |
1241 boolean interrupted = false; |
1145 |
1242 |
1146 // set state to ST_CLOSING and invalid membership keys |
1243 // set state to ST_CLOSING and invalid membership keys |
1147 synchronized (stateLock) { |
1244 stateLock.lock(); |
|
1245 try { |
1148 assert state < ST_CLOSING; |
1246 assert state < ST_CLOSING; |
1149 blocking = isBlocking(); |
1247 blocking = isBlocking(); |
1150 state = ST_CLOSING; |
1248 state = ST_CLOSING; |
1151 |
1249 |
1152 // if member of any multicast groups then invalidate the keys |
1250 // if member of any multicast groups then invalidate the keys |
1153 if (registry != null) |
1251 if (registry != null) |
1154 registry.invalidateAll(); |
1252 registry.invalidateAll(); |
|
1253 } finally { |
|
1254 stateLock.unlock(); |
1155 } |
1255 } |
1156 |
1256 |
1157 // wait for any outstanding I/O operations to complete |
1257 // wait for any outstanding I/O operations to complete |
1158 if (blocking) { |
1258 if (blocking) { |
1159 synchronized (stateLock) { |
1259 stateLock.lock(); |
|
1260 try { |
1160 assert state == ST_CLOSING; |
1261 assert state == ST_CLOSING; |
1161 long reader = readerThread; |
1262 long reader = readerThread; |
1162 long writer = writerThread; |
1263 long writer = writerThread; |
1163 if (reader != 0 || writer != 0) { |
1264 if (reader != 0 || writer != 0) { |
1164 nd.preClose(fd); |
1265 nd.preClose(fd); |
1169 NativeThread.signal(writer); |
1270 NativeThread.signal(writer); |
1170 |
1271 |
1171 // wait for blocking I/O operations to end |
1272 // wait for blocking I/O operations to end |
1172 while (readerThread != 0 || writerThread != 0) { |
1273 while (readerThread != 0 || writerThread != 0) { |
1173 try { |
1274 try { |
1174 stateLock.wait(); |
1275 stateCondition.await(); |
1175 } catch (InterruptedException e) { |
1276 } catch (InterruptedException e) { |
1176 interrupted = true; |
1277 interrupted = true; |
1177 } |
1278 } |
1178 } |
1279 } |
1179 } |
1280 } |
|
1281 } finally { |
|
1282 stateLock.unlock(); |
1180 } |
1283 } |
1181 } else { |
1284 } else { |
1182 // non-blocking mode: wait for read/write to complete |
1285 // non-blocking mode: wait for read/write to complete |
1183 readLock.lock(); |
1286 readLock.lock(); |
1184 try { |
1287 try { |
1188 readLock.unlock(); |
1291 readLock.unlock(); |
1189 } |
1292 } |
1190 } |
1293 } |
1191 |
1294 |
1192 // set state to ST_KILLPENDING |
1295 // set state to ST_KILLPENDING |
1193 synchronized (stateLock) { |
1296 stateLock.lock(); |
|
1297 try { |
1194 assert state == ST_CLOSING; |
1298 assert state == ST_CLOSING; |
1195 state = ST_KILLPENDING; |
1299 state = ST_KILLPENDING; |
|
1300 } finally { |
|
1301 stateLock.unlock(); |
1196 } |
1302 } |
1197 |
1303 |
1198 // close socket if not registered with Selector |
1304 // close socket if not registered with Selector |
1199 if (!isRegistered()) |
1305 if (!isRegistered()) |
1200 kill(); |
1306 kill(); |
1204 Thread.currentThread().interrupt(); |
1310 Thread.currentThread().interrupt(); |
1205 } |
1311 } |
1206 |
1312 |
1207 @Override |
1313 @Override |
1208 public void kill() throws IOException { |
1314 public void kill() throws IOException { |
1209 synchronized (stateLock) { |
1315 stateLock.lock(); |
|
1316 try { |
1210 if (state == ST_KILLPENDING) { |
1317 if (state == ST_KILLPENDING) { |
1211 state = ST_KILLED; |
1318 state = ST_KILLED; |
1212 try { |
1319 try { |
1213 nd.close(fd); |
1320 nd.close(fd); |
1214 } finally { |
1321 } finally { |
1215 // notify resource manager |
1322 // notify resource manager |
1216 ResourceManager.afterUdpClose(); |
1323 ResourceManager.afterUdpClose(); |
1217 } |
1324 } |
1218 } |
1325 } |
|
1326 } finally { |
|
1327 stateLock.unlock(); |
1219 } |
1328 } |
1220 } |
1329 } |
1221 |
1330 |
1222 @SuppressWarnings("deprecation") |
1331 @SuppressWarnings("deprecation") |
1223 protected void finalize() throws IOException { |
1332 protected void finalize() throws IOException { |