62 |
62 |
63 /** |
63 /** |
64 * An implementation of SocketChannels |
64 * An implementation of SocketChannels |
65 */ |
65 */ |
66 |
66 |
67 class SocketChannelImpl |
67 public abstract class SocketChannelImpl |
68 extends SocketChannel |
68 extends SocketChannel |
69 implements SelChImpl |
69 implements SelChImpl |
70 { |
70 { |
71 // Used to make native read and write calls |
71 // Used to make native read and write calls |
72 private static final NativeDispatcher nd = new SocketDispatcher(); |
72 private static final NativeDispatcher nd = new SocketDispatcher(); |
73 |
73 |
74 // Our file descriptor object |
74 // Our file descriptor object |
75 private final FileDescriptor fd; |
75 final FileDescriptor fd; |
76 private final int fdVal; |
76 private final int fdVal; |
77 |
77 |
78 // Lock held by current reading or connecting thread |
78 // Lock held by current reading or connecting thread |
79 private final ReentrantLock readLock = new ReentrantLock(); |
79 final ReentrantLock readLock = new ReentrantLock(); |
80 |
80 |
81 // Lock held by current writing or connecting thread |
81 // Lock held by current writing or connecting thread |
82 private final ReentrantLock writeLock = new ReentrantLock(); |
82 final ReentrantLock writeLock = new ReentrantLock(); |
83 |
83 |
84 // Lock held by any thread that modifies the state fields declared below |
84 // Lock held by any thread that modifies the state fields declared below |
85 // DO NOT invoke a blocking I/O operation while holding this lock! |
85 // DO NOT invoke a blocking I/O operation while holding this lock! |
86 private final Object stateLock = new Object(); |
86 final Object stateLock = new Object(); |
87 |
87 |
88 // Input/Output closed |
88 // Input/Output closed |
89 private volatile boolean isInputClosed; |
89 volatile boolean isInputClosed; |
90 private volatile boolean isOutputClosed; |
90 volatile boolean isOutputClosed; |
91 |
91 |
92 // Connection reset protected by readLock |
92 // Connection reset protected by readLock |
93 private boolean connectionReset; |
93 private boolean connectionReset; |
94 |
94 |
95 // -- The following fields are protected by stateLock |
95 // -- The following fields are protected by stateLock |
96 |
96 |
97 // set true when exclusive binding is on and SO_REUSEADDR is emulated |
|
98 private boolean isReuseAddress; |
|
99 |
|
100 // State, increases monotonically |
97 // State, increases monotonically |
101 private static final int ST_UNCONNECTED = 0; |
98 static final int ST_UNCONNECTED = 0; |
102 private static final int ST_CONNECTIONPENDING = 1; |
99 static final int ST_CONNECTIONPENDING = 1; |
103 private static final int ST_CONNECTED = 2; |
100 static final int ST_CONNECTED = 2; |
104 private static final int ST_CLOSING = 3; |
101 static final int ST_CLOSING = 3; |
105 private static final int ST_CLOSED = 4; |
102 static final int ST_CLOSED = 4; |
106 private volatile int state; // need stateLock to change |
103 volatile int state; // need stateLock to change |
107 |
104 |
108 // IDs of native threads doing reads and writes, for signalling |
105 // IDs of native threads doing reads and writes, for signalling |
109 private long readerThread; |
106 private long readerThread; |
110 private long writerThread; |
107 private long writerThread; |
111 |
108 |
112 // Binding |
109 // Binding |
113 private InetSocketAddress localAddress; |
110 SocketAddress localAddress; |
114 private InetSocketAddress remoteAddress; |
111 SocketAddress remoteAddress; |
115 |
112 |
116 // Socket adaptor, created on demand |
113 // Socket adaptor, created on demand |
117 private Socket socket; |
114 Socket socket; |
118 |
115 |
119 // -- End of fields protected by stateLock |
116 // -- End of fields protected by stateLock |
120 |
117 |
121 |
118 |
122 // Constructor for normal connecting sockets |
119 // Constructor for normal connecting sockets |
123 // |
120 // |
124 SocketChannelImpl(SelectorProvider sp) throws IOException { |
121 public SocketChannelImpl(SelectorProvider sp) throws IOException { |
125 super(sp); |
122 super(sp); |
126 this.fd = Net.socket(true); |
123 this.fd = Net.socket(true); |
127 this.fdVal = IOUtil.fdVal(fd); |
124 this.fdVal = IOUtil.fdVal(fd); |
128 } |
125 } |
129 |
126 |
130 SocketChannelImpl(SelectorProvider sp, FileDescriptor fd, boolean bound) |
127 public SocketChannelImpl(SelectorProvider sp, FileDescriptor fd) |
131 throws IOException |
128 throws IOException |
132 { |
129 { |
133 super(sp); |
130 super(sp); |
134 this.fd = fd; |
131 this.fd = fd; |
135 this.fdVal = IOUtil.fdVal(fd); |
132 this.fdVal = IOUtil.fdVal(fd); |
136 if (bound) { |
|
137 synchronized (stateLock) { |
|
138 this.localAddress = Net.localAddress(fd); |
|
139 } |
|
140 } |
|
141 } |
|
142 |
|
143 // Constructor for sockets obtained from server sockets |
|
144 // |
|
145 SocketChannelImpl(SelectorProvider sp, FileDescriptor fd, InetSocketAddress isa) |
|
146 throws IOException |
|
147 { |
|
148 super(sp); |
|
149 this.fd = fd; |
|
150 this.fdVal = IOUtil.fdVal(fd); |
|
151 synchronized (stateLock) { |
|
152 this.localAddress = Net.localAddress(fd); |
|
153 this.remoteAddress = isa; |
|
154 this.state = ST_CONNECTED; |
|
155 } |
|
156 } |
133 } |
157 |
134 |
158 /** |
135 /** |
159 * Checks that the channel is open. |
136 * Checks that the channel is open. |
160 * |
137 * |
161 * @throws ClosedChannelException if channel is closed (or closing) |
138 * @throws ClosedChannelException if channel is closed (or closing) |
162 */ |
139 */ |
163 private void ensureOpen() throws ClosedChannelException { |
140 void ensureOpen() throws ClosedChannelException { |
164 if (!isOpen()) |
141 if (!isOpen()) |
165 throw new ClosedChannelException(); |
142 throw new ClosedChannelException(); |
166 } |
143 } |
167 |
144 |
168 /** |
145 /** |
182 if (state < ST_CONNECTED) { |
159 if (state < ST_CONNECTED) { |
183 throw new NotYetConnectedException(); |
160 throw new NotYetConnectedException(); |
184 } else if (state > ST_CONNECTED) { |
161 } else if (state > ST_CONNECTED) { |
185 throw new ClosedChannelException(); |
162 throw new ClosedChannelException(); |
186 } |
163 } |
187 } |
|
188 |
|
189 @Override |
|
190 public Socket socket() { |
|
191 synchronized (stateLock) { |
|
192 if (socket == null) |
|
193 socket = SocketAdaptor.create(this); |
|
194 return socket; |
|
195 } |
|
196 } |
|
197 |
|
198 @Override |
|
199 public SocketAddress getLocalAddress() throws IOException { |
|
200 synchronized (stateLock) { |
|
201 ensureOpen(); |
|
202 return Net.getRevealedLocalAddress(localAddress); |
|
203 } |
|
204 } |
|
205 |
|
206 @Override |
|
207 public SocketAddress getRemoteAddress() throws IOException { |
|
208 synchronized (stateLock) { |
|
209 ensureOpen(); |
|
210 return remoteAddress; |
|
211 } |
|
212 } |
|
213 |
|
214 @Override |
|
215 public <T> SocketChannel setOption(SocketOption<T> name, T value) |
|
216 throws IOException |
|
217 { |
|
218 Objects.requireNonNull(name); |
|
219 if (!supportedOptions().contains(name)) |
|
220 throw new UnsupportedOperationException("'" + name + "' not supported"); |
|
221 if (!name.type().isInstance(value)) |
|
222 throw new IllegalArgumentException("Invalid value '" + value + "'"); |
|
223 |
|
224 synchronized (stateLock) { |
|
225 ensureOpen(); |
|
226 |
|
227 if (name == StandardSocketOptions.IP_TOS) { |
|
228 ProtocolFamily family = Net.isIPv6Available() ? |
|
229 StandardProtocolFamily.INET6 : StandardProtocolFamily.INET; |
|
230 Net.setSocketOption(fd, family, name, value); |
|
231 return this; |
|
232 } |
|
233 |
|
234 if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) { |
|
235 // SO_REUSEADDR emulated when using exclusive bind |
|
236 isReuseAddress = (Boolean)value; |
|
237 return this; |
|
238 } |
|
239 |
|
240 // no options that require special handling |
|
241 Net.setSocketOption(fd, name, value); |
|
242 return this; |
|
243 } |
|
244 } |
|
245 |
|
246 @Override |
|
247 @SuppressWarnings("unchecked") |
|
248 public <T> T getOption(SocketOption<T> name) |
|
249 throws IOException |
|
250 { |
|
251 Objects.requireNonNull(name); |
|
252 if (!supportedOptions().contains(name)) |
|
253 throw new UnsupportedOperationException("'" + name + "' not supported"); |
|
254 |
|
255 synchronized (stateLock) { |
|
256 ensureOpen(); |
|
257 |
|
258 if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) { |
|
259 // SO_REUSEADDR emulated when using exclusive bind |
|
260 return (T)Boolean.valueOf(isReuseAddress); |
|
261 } |
|
262 |
|
263 // special handling for IP_TOS: always return 0 when IPv6 |
|
264 if (name == StandardSocketOptions.IP_TOS) { |
|
265 ProtocolFamily family = Net.isIPv6Available() ? |
|
266 StandardProtocolFamily.INET6 : StandardProtocolFamily.INET; |
|
267 return (T) Net.getSocketOption(fd, family, name); |
|
268 } |
|
269 |
|
270 // no options that require special handling |
|
271 return (T) Net.getSocketOption(fd, name); |
|
272 } |
|
273 } |
|
274 |
|
275 private static class DefaultOptionsHolder { |
|
276 static final Set<SocketOption<?>> defaultOptions = defaultOptions(); |
|
277 |
|
278 private static Set<SocketOption<?>> defaultOptions() { |
|
279 HashSet<SocketOption<?>> set = new HashSet<>(); |
|
280 set.add(StandardSocketOptions.SO_SNDBUF); |
|
281 set.add(StandardSocketOptions.SO_RCVBUF); |
|
282 set.add(StandardSocketOptions.SO_KEEPALIVE); |
|
283 set.add(StandardSocketOptions.SO_REUSEADDR); |
|
284 if (Net.isReusePortAvailable()) { |
|
285 set.add(StandardSocketOptions.SO_REUSEPORT); |
|
286 } |
|
287 set.add(StandardSocketOptions.SO_LINGER); |
|
288 set.add(StandardSocketOptions.TCP_NODELAY); |
|
289 // additional options required by socket adaptor |
|
290 set.add(StandardSocketOptions.IP_TOS); |
|
291 set.add(ExtendedSocketOption.SO_OOBINLINE); |
|
292 set.addAll(ExtendedSocketOptions.clientSocketOptions()); |
|
293 return Collections.unmodifiableSet(set); |
|
294 } |
|
295 } |
|
296 |
|
297 @Override |
|
298 public final Set<SocketOption<?>> supportedOptions() { |
|
299 return DefaultOptionsHolder.defaultOptions; |
|
300 } |
164 } |
301 |
165 |
302 /** |
166 /** |
303 * Marks the beginning of a read operation that might block. |
167 * Marks the beginning of a read operation that might block. |
304 * |
168 * |
584 } |
420 } |
585 |
421 |
586 /** |
422 /** |
587 * Returns the local address, or null if not bound |
423 * Returns the local address, or null if not bound |
588 */ |
424 */ |
589 InetSocketAddress localAddress() { |
425 SocketAddress localAddress() { |
590 synchronized (stateLock) { |
426 synchronized (stateLock) { |
591 return localAddress; |
427 return localAddress; |
592 } |
428 } |
593 } |
429 } |
594 |
430 |
595 /** |
431 /** |
596 * Returns the remote address, or null if not connected |
432 * Returns the remote address, or null if not connected |
597 */ |
433 */ |
598 InetSocketAddress remoteAddress() { |
434 SocketAddress remoteAddress() { |
599 synchronized (stateLock) { |
435 synchronized (stateLock) { |
600 return remoteAddress; |
436 return remoteAddress; |
601 } |
437 } |
602 } |
|
603 |
|
604 @Override |
|
605 public SocketChannel bind(SocketAddress local) throws IOException { |
|
606 readLock.lock(); |
|
607 try { |
|
608 writeLock.lock(); |
|
609 try { |
|
610 synchronized (stateLock) { |
|
611 ensureOpen(); |
|
612 if (state == ST_CONNECTIONPENDING) |
|
613 throw new ConnectionPendingException(); |
|
614 if (localAddress != null) |
|
615 throw new AlreadyBoundException(); |
|
616 InetSocketAddress isa = (local == null) ? |
|
617 new InetSocketAddress(0) : Net.checkAddress(local); |
|
618 SecurityManager sm = System.getSecurityManager(); |
|
619 if (sm != null) { |
|
620 sm.checkListen(isa.getPort()); |
|
621 } |
|
622 NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort()); |
|
623 Net.bind(fd, isa.getAddress(), isa.getPort()); |
|
624 localAddress = Net.localAddress(fd); |
|
625 } |
|
626 } finally { |
|
627 writeLock.unlock(); |
|
628 } |
|
629 } finally { |
|
630 readLock.unlock(); |
|
631 } |
|
632 return this; |
|
633 } |
438 } |
634 |
439 |
635 @Override |
440 @Override |
636 public boolean isConnected() { |
441 public boolean isConnected() { |
637 return (state == ST_CONNECTED); |
442 return (state == ST_CONNECTED); |
684 * |
491 * |
685 * @throws AsynchronousCloseException if the channel was closed due to this |
492 * @throws AsynchronousCloseException if the channel was closed due to this |
686 * thread being interrupted on a blocking connect operation. |
493 * thread being interrupted on a blocking connect operation. |
687 * @throws IOException if completed and unable to obtain the local address |
494 * @throws IOException if completed and unable to obtain the local address |
688 */ |
495 */ |
689 private void endConnect(boolean blocking, boolean completed) |
496 void endConnect(boolean blocking, boolean completed) |
690 throws IOException |
497 throws IOException |
691 { |
498 { |
692 endRead(blocking, completed); |
499 endRead(blocking, completed); |
693 |
500 |
694 if (completed) { |
501 if (completed) { |
695 synchronized (stateLock) { |
502 synchronized (stateLock) { |
696 if (state == ST_CONNECTIONPENDING) { |
503 if (state == ST_CONNECTIONPENDING) { |
697 localAddress = Net.localAddress(fd); |
504 localAddress = localAddressImpl(fd); |
698 state = ST_CONNECTED; |
505 state = ST_CONNECTED; |
699 } |
506 } |
700 } |
507 } |
701 } |
508 } |
702 } |
509 } |
703 |
510 |
704 /** |
511 /** |
705 * Checks the remote address to which this channel is to be connected. |
512 * Checks the remote address to which this channel is to be connected. |
706 */ |
513 */ |
707 private InetSocketAddress checkRemote(SocketAddress sa) throws IOException { |
514 abstract SocketAddress checkRemote(SocketAddress sa) throws IOException; |
708 InetSocketAddress isa = Net.checkAddress(sa); |
515 |
709 SecurityManager sm = System.getSecurityManager(); |
516 abstract int connectImpl(FileDescriptor fd,SocketAddress sa) throws IOException; |
710 if (sm != null) { |
|
711 sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort()); |
|
712 } |
|
713 if (isa.getAddress().isAnyLocalAddress()) { |
|
714 return new InetSocketAddress(InetAddress.getLocalHost(), isa.getPort()); |
|
715 } else { |
|
716 return isa; |
|
717 } |
|
718 } |
|
719 |
517 |
720 @Override |
518 @Override |
721 public boolean connect(SocketAddress remote) throws IOException { |
519 public boolean connect(SocketAddress remote) throws IOException { |
722 InetSocketAddress isa = checkRemote(remote); |
520 SocketAddress sa = checkRemote(remote); |
723 try { |
521 try { |
724 readLock.lock(); |
522 readLock.lock(); |
725 try { |
523 try { |
726 writeLock.lock(); |
524 writeLock.lock(); |
727 try { |
525 try { |
728 boolean blocking = isBlocking(); |
526 boolean blocking = isBlocking(); |
729 boolean connected = false; |
527 boolean connected = false; |
730 try { |
528 try { |
731 beginConnect(blocking, isa); |
529 beginConnect(blocking, sa); |
732 int n = Net.connect(fd, isa.getAddress(), isa.getPort()); |
530 int n = connectImpl(fd, sa); |
733 if (n > 0) { |
531 if (n > 0) { |
734 connected = true; |
532 connected = true; |
735 } else if (blocking) { |
533 } else if (blocking) { |
736 assert IOStatus.okayToRetry(n); |
534 assert IOStatus.okayToRetry(n); |
737 boolean polled = false; |
535 boolean polled = false; |
1032 * |
840 * |
1033 * @throws IllegalBlockingModeException if the channel is non-blocking |
841 * @throws IllegalBlockingModeException if the channel is non-blocking |
1034 * @throws SocketTimeoutException if the read timeout elapses |
842 * @throws SocketTimeoutException if the read timeout elapses |
1035 */ |
843 */ |
1036 void blockingConnect(SocketAddress remote, long nanos) throws IOException { |
844 void blockingConnect(SocketAddress remote, long nanos) throws IOException { |
1037 InetSocketAddress isa = checkRemote(remote); |
845 SocketAddress sa = checkRemote(remote); |
1038 try { |
846 try { |
1039 readLock.lock(); |
847 readLock.lock(); |
1040 try { |
848 try { |
1041 writeLock.lock(); |
849 writeLock.lock(); |
1042 try { |
850 try { |
1043 if (!isBlocking()) |
851 if (!isBlocking()) |
1044 throw new IllegalBlockingModeException(); |
852 throw new IllegalBlockingModeException(); |
1045 boolean connected = false; |
853 boolean connected = false; |
1046 try { |
854 try { |
1047 beginConnect(true, isa); |
855 beginConnect(true, sa); |
1048 // change socket to non-blocking |
856 // change socket to non-blocking |
1049 lockedConfigureBlocking(false); |
857 lockedConfigureBlocking(false); |
1050 try { |
858 try { |
1051 int n = Net.connect(fd, isa.getAddress(), isa.getPort()); |
859 int n = connectImpl(fd, sa); |
1052 connected = (n > 0) ? true : finishTimedConnect(nanos); |
860 connected = (n > 0) ? true : finishTimedConnect(nanos); |
1053 } finally { |
861 } finally { |
1054 // restore socket to blocking mode |
862 // restore socket to blocking mode |
1055 lockedConfigureBlocking(true); |
863 lockedConfigureBlocking(true); |
1056 } |
864 } |