88 private final ProtocolFamily family; |
88 private final ProtocolFamily family; |
89 |
89 |
90 // Our file descriptor |
90 // Our file descriptor |
91 private final FileDescriptor fd; |
91 private final FileDescriptor fd; |
92 private final int fdVal; |
92 private final int fdVal; |
|
93 |
|
94 // Native buffer for socket address used by receive0, protected by readLock |
|
95 private final NativeSocketAddress socketAddress; |
|
96 |
|
97 // Cleaner to close file descriptor and free native socket address |
93 private final Cleanable cleaner; |
98 private final Cleanable cleaner; |
94 |
|
95 // Cached InetAddress and port for unconnected DatagramChannels |
|
96 // used by receive0 |
|
97 private InetAddress cachedSenderInetAddress; |
|
98 private int cachedSenderPort; |
|
99 |
99 |
100 // Lock held by current reading or connecting thread |
100 // Lock held by current reading or connecting thread |
101 private final ReentrantLock readLock = new ReentrantLock(); |
101 private final ReentrantLock readLock = new ReentrantLock(); |
102 |
102 |
103 // Lock held by current writing or connecting thread |
103 // Lock held by current writing or connecting thread |
152 |
152 |
153 public DatagramChannelImpl(SelectorProvider sp) |
153 public DatagramChannelImpl(SelectorProvider sp) |
154 throws IOException |
154 throws IOException |
155 { |
155 { |
156 super(sp); |
156 super(sp); |
|
157 |
|
158 this.socketAddress = new NativeSocketAddress(); |
|
159 |
157 ResourceManager.beforeUdpCreate(); |
160 ResourceManager.beforeUdpCreate(); |
158 try { |
161 try { |
159 this.family = Net.isIPv6Available() |
162 this.family = Net.isIPv6Available() |
160 ? StandardProtocolFamily.INET6 |
163 ? StandardProtocolFamily.INET6 |
161 : StandardProtocolFamily.INET; |
164 : StandardProtocolFamily.INET; |
162 this.fd = Net.socket(family, false); |
165 this.fd = Net.socket(family, false); |
163 this.fdVal = IOUtil.fdVal(fd); |
166 this.fdVal = IOUtil.fdVal(fd); |
164 } catch (IOException ioe) { |
167 } catch (IOException ioe) { |
165 ResourceManager.afterUdpClose(); |
168 ResourceManager.afterUdpClose(); |
|
169 socketAddress.free(); |
166 throw ioe; |
170 throw ioe; |
167 } |
171 } |
168 this.cleaner = CleanerFactory.cleaner().register(this, closerFor(fd)); |
172 |
|
173 Runnable releaser = releaserFor(fd, socketAddress); |
|
174 this.cleaner = CleanerFactory.cleaner().register(this, releaser); |
169 } |
175 } |
170 |
176 |
171 public DatagramChannelImpl(SelectorProvider sp, ProtocolFamily family) |
177 public DatagramChannelImpl(SelectorProvider sp, ProtocolFamily family) |
172 throws IOException |
178 throws IOException |
173 { |
179 { |
181 if (!Net.isIPv6Available()) { |
187 if (!Net.isIPv6Available()) { |
182 throw new UnsupportedOperationException("IPv6 not available"); |
188 throw new UnsupportedOperationException("IPv6 not available"); |
183 } |
189 } |
184 } |
190 } |
185 |
191 |
|
192 this.socketAddress = new NativeSocketAddress(); |
|
193 |
186 ResourceManager.beforeUdpCreate(); |
194 ResourceManager.beforeUdpCreate(); |
187 try { |
195 try { |
188 this.family = family; |
196 this.family = family; |
189 this.fd = Net.socket(family, false); |
197 this.fd = Net.socket(family, false); |
190 this.fdVal = IOUtil.fdVal(fd); |
198 this.fdVal = IOUtil.fdVal(fd); |
191 } catch (IOException ioe) { |
199 } catch (IOException ioe) { |
192 ResourceManager.afterUdpClose(); |
200 ResourceManager.afterUdpClose(); |
|
201 socketAddress.free(); |
193 throw ioe; |
202 throw ioe; |
194 } |
203 } |
195 this.cleaner = CleanerFactory.cleaner().register(this, closerFor(fd)); |
204 |
|
205 Runnable releaser = releaserFor(fd, socketAddress); |
|
206 this.cleaner = CleanerFactory.cleaner().register(this, releaser); |
196 } |
207 } |
197 |
208 |
198 public DatagramChannelImpl(SelectorProvider sp, FileDescriptor fd) |
209 public DatagramChannelImpl(SelectorProvider sp, FileDescriptor fd) |
199 throws IOException |
210 throws IOException |
200 { |
211 { |
201 super(sp); |
212 super(sp); |
|
213 |
|
214 try { |
|
215 this.socketAddress = new NativeSocketAddress(); |
|
216 } catch (OutOfMemoryError e) { |
|
217 nd.close(fd); |
|
218 throw e; |
|
219 } |
202 |
220 |
203 // increment UDP count to match decrement when closing |
221 // increment UDP count to match decrement when closing |
204 ResourceManager.beforeUdpCreate(); |
222 ResourceManager.beforeUdpCreate(); |
205 |
223 |
206 this.family = Net.isIPv6Available() |
224 this.family = Net.isIPv6Available() |
207 ? StandardProtocolFamily.INET6 |
225 ? StandardProtocolFamily.INET6 |
208 : StandardProtocolFamily.INET; |
226 : StandardProtocolFamily.INET; |
209 this.fd = fd; |
227 this.fd = fd; |
210 this.fdVal = IOUtil.fdVal(fd); |
228 this.fdVal = IOUtil.fdVal(fd); |
211 this.cleaner = CleanerFactory.cleaner().register(this, closerFor(fd)); |
229 |
|
230 Runnable releaser = releaserFor(fd, socketAddress); |
|
231 this.cleaner = CleanerFactory.cleaner().register(this, releaser); |
|
232 |
212 synchronized (stateLock) { |
233 synchronized (stateLock) { |
213 this.localAddress = Net.localAddress(fd); |
234 this.localAddress = Net.localAddress(fd); |
214 } |
235 } |
215 } |
236 } |
216 |
237 |
448 // remove hook for Thread.interrupt |
469 // remove hook for Thread.interrupt |
449 end(completed); |
470 end(completed); |
450 } |
471 } |
451 } |
472 } |
452 |
473 |
453 private SocketAddress sender; // Set by receive0 (## ugh) |
|
454 |
|
455 @Override |
474 @Override |
456 public SocketAddress receive(ByteBuffer dst) throws IOException { |
475 public SocketAddress receive(ByteBuffer dst) throws IOException { |
457 if (dst.isReadOnly()) |
476 if (dst.isReadOnly()) |
458 throw new IllegalArgumentException("Read-only buffer"); |
477 throw new IllegalArgumentException("Read-only buffer"); |
459 readLock.lock(); |
478 readLock.lock(); |
460 try { |
479 try { |
461 boolean blocking = isBlocking(); |
480 boolean blocking = isBlocking(); |
462 boolean completed = false; |
481 SocketAddress sender = null; |
463 int n = 0; |
|
464 try { |
482 try { |
465 SocketAddress remote = beginRead(blocking, false); |
483 SocketAddress remote = beginRead(blocking, false); |
466 boolean connected = (remote != null); |
484 boolean connected = (remote != null); |
467 SecurityManager sm = System.getSecurityManager(); |
485 SecurityManager sm = System.getSecurityManager(); |
468 |
|
469 if (connected || (sm == null)) { |
486 if (connected || (sm == null)) { |
470 // connected or no security manager |
487 // connected or no security manager |
471 n = receive(dst, connected); |
488 int n = receive(dst, connected); |
472 if (blocking) { |
489 if (blocking) { |
473 while (IOStatus.okayToRetry(n) && isOpen()) { |
490 while (IOStatus.okayToRetry(n) && isOpen()) { |
474 park(Net.POLLIN); |
491 park(Net.POLLIN); |
475 n = receive(dst, connected); |
492 n = receive(dst, connected); |
476 } |
493 } |
477 } |
494 } |
|
495 if (n >= 0) { |
|
496 // sender address is in socket address buffer |
|
497 sender = socketAddress.toInetSocketAddress(); |
|
498 } |
478 } else { |
499 } else { |
479 // security manager and unconnected |
500 // security manager and unconnected |
480 n = untrustedReceive(dst); |
501 sender = untrustedReceive(dst); |
481 } |
502 } |
482 if (n == IOStatus.UNAVAILABLE) |
|
483 return null; |
|
484 completed = (n > 0) || (n == 0 && isOpen()); |
|
485 return sender; |
503 return sender; |
486 } finally { |
504 } finally { |
487 endRead(blocking, completed); |
505 endRead(blocking, (sender != null)); |
488 assert IOStatus.check(n); |
|
489 } |
506 } |
490 } finally { |
507 } finally { |
491 readLock.unlock(); |
508 readLock.unlock(); |
492 } |
509 } |
493 } |
510 } |
496 * Receives a datagram into an untrusted buffer. When there is a security |
513 * Receives a datagram into an untrusted buffer. When there is a security |
497 * manager set, and the socket is not connected, datagrams have to be received |
514 * manager set, and the socket is not connected, datagrams have to be received |
498 * into a buffer that is not accessible to the user. The datagram is copied |
515 * into a buffer that is not accessible to the user. The datagram is copied |
499 * into the user's buffer when the sender address is accepted by the security |
516 * into the user's buffer when the sender address is accepted by the security |
500 * manager. |
517 * manager. |
501 * |
518 */ |
502 * @return the size of the datagram or IOStatus.UNAVAILABLE |
519 private SocketAddress untrustedReceive(ByteBuffer dst) throws IOException { |
503 */ |
|
504 private int untrustedReceive(ByteBuffer dst) throws IOException { |
|
505 SecurityManager sm = System.getSecurityManager(); |
520 SecurityManager sm = System.getSecurityManager(); |
506 assert readLock.isHeldByCurrentThread() |
521 assert readLock.isHeldByCurrentThread() |
507 && sm != null && remoteAddress == null; |
522 && sm != null && remoteAddress == null; |
508 |
523 |
509 ByteBuffer bb = Util.getTemporaryDirectBuffer(dst.remaining()); |
524 ByteBuffer bb = Util.getTemporaryDirectBuffer(dst.remaining()); |
514 if (blocking) { |
529 if (blocking) { |
515 while (IOStatus.okayToRetry(n) && isOpen()) { |
530 while (IOStatus.okayToRetry(n) && isOpen()) { |
516 park(Net.POLLIN); |
531 park(Net.POLLIN); |
517 n = receive(bb, false); |
532 n = receive(bb, false); |
518 } |
533 } |
519 } else if (n == IOStatus.UNAVAILABLE) { |
534 } |
520 return n; |
535 if (n >= 0) { |
521 } |
536 // sender address is in socket address buffer |
522 InetSocketAddress isa = (InetSocketAddress) sender; |
537 InetSocketAddress isa = socketAddress.toInetSocketAddress(); |
523 try { |
538 try { |
524 sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort()); |
539 sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort()); |
525 bb.flip(); |
540 bb.flip(); |
526 dst.put(bb); |
541 dst.put(bb); |
527 return n; |
542 return isa; |
528 } catch (SecurityException se) { |
543 } catch (SecurityException se) { |
529 // ignore datagram |
544 // ignore datagram |
530 bb.clear(); |
545 bb.clear(); |
|
546 } |
|
547 } else { |
|
548 return null; |
531 } |
549 } |
532 } |
550 } |
533 } finally { |
551 } finally { |
534 Util.releaseTemporaryDirectBuffer(bb); |
552 Util.releaseTemporaryDirectBuffer(bb); |
535 } |
553 } |
582 */ |
600 */ |
583 private SocketAddress trustedBlockingReceive(ByteBuffer dst) |
601 private SocketAddress trustedBlockingReceive(ByteBuffer dst) |
584 throws IOException |
602 throws IOException |
585 { |
603 { |
586 assert readLock.isHeldByCurrentThread() && isBlocking(); |
604 assert readLock.isHeldByCurrentThread() && isBlocking(); |
587 boolean completed = false; |
605 SocketAddress sender = null; |
588 int n = 0; |
|
589 try { |
606 try { |
590 SocketAddress remote = beginRead(true, false); |
607 SocketAddress remote = beginRead(true, false); |
591 boolean connected = (remote != null); |
608 boolean connected = (remote != null); |
592 n = receive(dst, connected); |
609 int n = receive(dst, connected); |
593 while (n == IOStatus.UNAVAILABLE && isOpen()) { |
610 while (IOStatus.okayToRetry(n) && isOpen()) { |
594 park(Net.POLLIN); |
611 park(Net.POLLIN); |
595 n = receive(dst, connected); |
612 n = receive(dst, connected); |
596 } |
613 } |
597 completed = (n > 0) || (n == 0 && isOpen()); |
614 if (n >= 0) { |
|
615 // sender address is in socket address buffer |
|
616 sender = socketAddress.toInetSocketAddress(); |
|
617 } |
598 return sender; |
618 return sender; |
599 } finally { |
619 } finally { |
600 endRead(true, completed); |
620 endRead(true, (sender != null)); |
601 assert IOStatus.check(n); |
|
602 } |
621 } |
603 } |
622 } |
604 |
623 |
605 /** |
624 /** |
606 * Receives a datagram into given buffer with a timeout. This method is |
625 * Receives a datagram into given buffer with a timeout. This method is |
609 */ |
628 */ |
610 private SocketAddress trustedBlockingReceive(ByteBuffer dst, long nanos) |
629 private SocketAddress trustedBlockingReceive(ByteBuffer dst, long nanos) |
611 throws IOException |
630 throws IOException |
612 { |
631 { |
613 assert readLock.isHeldByCurrentThread() && isBlocking(); |
632 assert readLock.isHeldByCurrentThread() && isBlocking(); |
614 boolean completed = false; |
633 SocketAddress sender = null; |
615 int n = 0; |
|
616 try { |
634 try { |
617 SocketAddress remote = beginRead(true, false); |
635 SocketAddress remote = beginRead(true, false); |
618 boolean connected = (remote != null); |
636 boolean connected = (remote != null); |
619 |
637 |
620 // change socket to non-blocking |
638 // change socket to non-blocking |
621 lockedConfigureBlocking(false); |
639 lockedConfigureBlocking(false); |
622 try { |
640 try { |
623 long startNanos = System.nanoTime(); |
641 long startNanos = System.nanoTime(); |
624 n = receive(dst, connected); |
642 int n = receive(dst, connected); |
625 while (n == IOStatus.UNAVAILABLE && isOpen()) { |
643 while (n == IOStatus.UNAVAILABLE && isOpen()) { |
626 long remainingNanos = nanos - (System.nanoTime() - startNanos); |
644 long remainingNanos = nanos - (System.nanoTime() - startNanos); |
627 if (remainingNanos <= 0) { |
645 if (remainingNanos <= 0) { |
628 throw new SocketTimeoutException("Receive timed out"); |
646 throw new SocketTimeoutException("Receive timed out"); |
629 } |
647 } |
630 park(Net.POLLIN, remainingNanos); |
648 park(Net.POLLIN, remainingNanos); |
631 n = receive(dst, connected); |
649 n = receive(dst, connected); |
632 } |
650 } |
633 completed = (n > 0) || (n == 0 && isOpen()); |
651 if (n >= 0) { |
|
652 // sender address is in socket address buffer |
|
653 sender = socketAddress.toInetSocketAddress(); |
|
654 } |
634 return sender; |
655 return sender; |
635 } finally { |
656 } finally { |
636 // restore socket to blocking mode (if channel is open) |
657 // restore socket to blocking mode (if channel is open) |
637 tryLockedConfigureBlocking(true); |
658 tryLockedConfigureBlocking(true); |
638 } |
659 } |
639 |
|
640 } finally { |
660 } finally { |
641 endRead(true, completed); |
661 endRead(true, (sender != null)); |
642 assert IOStatus.check(n); |
|
643 } |
662 } |
644 } |
663 } |
645 |
664 |
646 private int receive(ByteBuffer dst, boolean connected) throws IOException { |
665 private int receive(ByteBuffer dst, boolean connected) throws IOException { |
647 int pos = dst.position(); |
666 int pos = dst.position(); |
669 |
688 |
670 private int receiveIntoNativeBuffer(ByteBuffer bb, int rem, int pos, |
689 private int receiveIntoNativeBuffer(ByteBuffer bb, int rem, int pos, |
671 boolean connected) |
690 boolean connected) |
672 throws IOException |
691 throws IOException |
673 { |
692 { |
674 int n = receive0(fd, ((DirectBuffer)bb).address() + pos, rem, connected); |
693 int n = receive0(fd, |
|
694 ((DirectBuffer)bb).address() + pos, rem, |
|
695 socketAddress.address(), |
|
696 connected); |
675 if (n > 0) |
697 if (n > 0) |
676 bb.position(pos + n); |
698 bb.position(pos + n); |
677 return n; |
699 return n; |
678 } |
700 } |
679 |
701 |
1707 public int getFDVal() { |
1729 public int getFDVal() { |
1708 return fdVal; |
1730 return fdVal; |
1709 } |
1731 } |
1710 |
1732 |
1711 /** |
1733 /** |
1712 * Returns an action to close the given file descriptor. |
1734 * Returns an action to release a the given file descriptor and free the |
1713 */ |
1735 * given native socket address. |
1714 private static Runnable closerFor(FileDescriptor fd) { |
1736 */ |
|
1737 private static Runnable releaserFor(FileDescriptor fd, NativeSocketAddress sa) { |
1715 return () -> { |
1738 return () -> { |
1716 try { |
1739 try { |
1717 nd.close(fd); |
1740 nd.close(fd); |
1718 } catch (IOException ioe) { |
1741 } catch (IOException ioe) { |
1719 throw new UncheckedIOException(ioe); |
1742 throw new UncheckedIOException(ioe); |
1720 } finally { |
1743 } finally { |
1721 // decrement |
1744 // decrement socket count and release memory |
1722 ResourceManager.afterUdpClose(); |
1745 ResourceManager.afterUdpClose(); |
|
1746 sa.free(); |
1723 } |
1747 } |
1724 }; |
1748 }; |
1725 } |
1749 } |
1726 |
1750 |
1727 // -- Native methods -- |
1751 // -- Native methods -- |
1728 |
|
1729 private static native void initIDs(); |
|
1730 |
1752 |
1731 private static native void disconnect0(FileDescriptor fd, boolean isIPv6) |
1753 private static native void disconnect0(FileDescriptor fd, boolean isIPv6) |
1732 throws IOException; |
1754 throws IOException; |
1733 |
1755 |
1734 private native int receive0(FileDescriptor fd, long address, int len, |
1756 private static native int receive0(FileDescriptor fd, long address, int len, |
1735 boolean connected) |
1757 long senderAddress, boolean connected) |
1736 throws IOException; |
1758 throws IOException; |
1737 |
1759 |
1738 private native int send0(boolean preferIPv6, FileDescriptor fd, long address, |
1760 private static native int send0(boolean preferIPv6, FileDescriptor fd, |
1739 int len, InetAddress addr, int port) |
1761 long address, int len, InetAddress addr, int port) |
1740 throws IOException; |
1762 throws IOException; |
1741 |
1763 |
1742 static { |
1764 static { |
1743 IOUtil.load(); |
1765 IOUtil.load(); |
1744 initIDs(); |
|
1745 } |
1766 } |
1746 } |
1767 } |