26 package sun.nio.ch; |
26 package sun.nio.ch; |
27 |
27 |
28 import java.io.FileDescriptor; |
28 import java.io.FileDescriptor; |
29 import java.io.IOException; |
29 import java.io.IOException; |
30 import java.nio.ByteBuffer; |
30 import java.nio.ByteBuffer; |
|
31 import java.nio.channels.AsynchronousCloseException; |
31 import java.nio.channels.ClosedChannelException; |
32 import java.nio.channels.ClosedChannelException; |
|
33 import java.nio.channels.NotYetConnectedException; |
32 import java.nio.channels.Pipe; |
34 import java.nio.channels.Pipe; |
33 import java.nio.channels.SelectionKey; |
35 import java.nio.channels.SelectionKey; |
34 import java.nio.channels.spi.SelectorProvider; |
36 import java.nio.channels.spi.SelectorProvider; |
|
37 import java.util.Objects; |
35 import java.util.concurrent.locks.ReentrantLock; |
38 import java.util.concurrent.locks.ReentrantLock; |
36 |
|
37 |
39 |
38 class SourceChannelImpl |
40 class SourceChannelImpl |
39 extends Pipe.SourceChannel |
41 extends Pipe.SourceChannel |
40 implements SelChImpl |
42 implements SelChImpl |
41 { |
43 { |
42 |
|
43 // Used to make native read and write calls |
44 // Used to make native read and write calls |
44 private static final NativeDispatcher nd = new FileDispatcherImpl(); |
45 private static final NativeDispatcher nd = new FileDispatcherImpl(); |
45 |
46 |
46 // The file descriptor associated with this channel |
47 // The file descriptor associated with this channel |
47 private final FileDescriptor fd; |
48 private final FileDescriptor fd; |
48 |
|
49 // fd value needed for dev/poll. This value will remain valid |
|
50 // even after the value in the file descriptor object has been set to -1 |
|
51 private final int fdVal; |
49 private final int fdVal; |
52 |
|
53 // ID of native thread doing read, for signalling |
|
54 private volatile long thread; |
|
55 |
50 |
56 // Lock held by current reading thread |
51 // Lock held by current reading thread |
57 private final ReentrantLock readLock = new ReentrantLock(); |
52 private final ReentrantLock readLock = new ReentrantLock(); |
58 |
53 |
59 // Lock held by any thread that modifies the state fields declared below |
54 // Lock held by any thread that modifies the state fields declared below |
61 private final Object stateLock = new Object(); |
56 private final Object stateLock = new Object(); |
62 |
57 |
63 // -- The following fields are protected by stateLock |
58 // -- The following fields are protected by stateLock |
64 |
59 |
65 // Channel state |
60 // Channel state |
66 private static final int ST_UNINITIALIZED = -1; |
|
67 private static final int ST_INUSE = 0; |
61 private static final int ST_INUSE = 0; |
68 private static final int ST_KILLED = 1; |
62 private static final int ST_CLOSING = 1; |
69 private volatile int state = ST_UNINITIALIZED; |
63 private static final int ST_KILLPENDING = 2; |
|
64 private static final int ST_KILLED = 3; |
|
65 private int state; |
|
66 |
|
67 // ID of native thread doing read, for signalling |
|
68 private long thread; |
70 |
69 |
71 // -- End of fields protected by stateLock |
70 // -- End of fields protected by stateLock |
72 |
71 |
73 |
72 |
74 public FileDescriptor getFD() { |
73 public FileDescriptor getFD() { |
81 |
80 |
82 SourceChannelImpl(SelectorProvider sp, FileDescriptor fd) { |
81 SourceChannelImpl(SelectorProvider sp, FileDescriptor fd) { |
83 super(sp); |
82 super(sp); |
84 this.fd = fd; |
83 this.fd = fd; |
85 this.fdVal = IOUtil.fdVal(fd); |
84 this.fdVal = IOUtil.fdVal(fd); |
86 this.state = ST_INUSE; |
85 } |
87 } |
86 |
88 |
87 /** |
|
88 * Invoked by implCloseChannel to close the channel. |
|
89 */ |
|
90 @Override |
89 protected void implCloseSelectableChannel() throws IOException { |
91 protected void implCloseSelectableChannel() throws IOException { |
90 synchronized (stateLock) { |
92 assert !isOpen(); |
91 if (state != ST_KILLED) |
93 |
92 nd.preClose(fd); |
94 boolean interrupted = false; |
93 long th = thread; |
95 boolean blocking; |
94 if (th != 0) |
96 |
95 NativeThread.signal(th); |
97 // set state to ST_CLOSING |
96 if (!isRegistered()) |
98 synchronized (stateLock) { |
97 kill(); |
99 assert state < ST_CLOSING; |
98 } |
100 state = ST_CLOSING; |
99 } |
101 blocking = isBlocking(); |
100 |
102 } |
|
103 |
|
104 // wait for any outstanding read to complete |
|
105 if (blocking) { |
|
106 synchronized (stateLock) { |
|
107 assert state == ST_CLOSING; |
|
108 long th = thread; |
|
109 if (th != 0) { |
|
110 nd.preClose(fd); |
|
111 NativeThread.signal(th); |
|
112 |
|
113 // wait for read operation to end |
|
114 while (thread != 0) { |
|
115 try { |
|
116 stateLock.wait(); |
|
117 } catch (InterruptedException e) { |
|
118 interrupted = true; |
|
119 } |
|
120 } |
|
121 } |
|
122 } |
|
123 } else { |
|
124 // non-blocking mode: wait for read to complete |
|
125 readLock.lock(); |
|
126 readLock.unlock(); |
|
127 } |
|
128 |
|
129 // set state to ST_KILLPENDING |
|
130 synchronized (stateLock) { |
|
131 assert state == ST_CLOSING; |
|
132 state = ST_KILLPENDING; |
|
133 } |
|
134 |
|
135 // close socket if not registered with Selector |
|
136 if (!isRegistered()) |
|
137 kill(); |
|
138 |
|
139 // restore interrupt status |
|
140 if (interrupted) |
|
141 Thread.currentThread().interrupt(); |
|
142 } |
|
143 |
|
144 @Override |
101 public void kill() throws IOException { |
145 public void kill() throws IOException { |
102 synchronized (stateLock) { |
146 synchronized (stateLock) { |
103 if (state == ST_KILLED) |
147 assert thread == 0; |
104 return; |
148 if (state == ST_KILLPENDING) { |
105 if (state == ST_UNINITIALIZED) { |
|
106 state = ST_KILLED; |
149 state = ST_KILLED; |
107 return; |
150 nd.close(fd); |
108 } |
151 } |
109 assert !isOpen() && !isRegistered(); |
152 } |
110 nd.close(fd); |
153 } |
111 state = ST_KILLED; |
154 |
112 } |
155 @Override |
113 } |
|
114 |
|
115 protected void implConfigureBlocking(boolean block) throws IOException { |
156 protected void implConfigureBlocking(boolean block) throws IOException { |
116 IOUtil.configureBlocking(fd, block); |
157 readLock.lock(); |
|
158 try { |
|
159 synchronized (stateLock) { |
|
160 IOUtil.configureBlocking(fd, block); |
|
161 } |
|
162 } finally { |
|
163 readLock.unlock(); |
|
164 } |
117 } |
165 } |
118 |
166 |
119 public boolean translateReadyOps(int ops, int initialOps, |
167 public boolean translateReadyOps(int ops, int initialOps, |
120 SelectionKeyImpl sk) { |
168 SelectionKeyImpl sk) { |
121 int intOps = sk.nioInterestOps(); // Do this just once, it synchronizes |
169 int intOps = sk.nioInterestOps(); // Do this just once, it synchronizes |
151 if (ops == SelectionKey.OP_READ) |
199 if (ops == SelectionKey.OP_READ) |
152 ops = Net.POLLIN; |
200 ops = Net.POLLIN; |
153 sk.selector.putEventOps(sk, ops); |
201 sk.selector.putEventOps(sk, ops); |
154 } |
202 } |
155 |
203 |
156 private void ensureOpen() throws IOException { |
204 /** |
157 if (!isOpen()) |
205 * Marks the beginning of a read operation that might block. |
158 throw new ClosedChannelException(); |
206 * |
159 } |
207 * @throws ClosedChannelException if the channel is closed |
160 |
208 * @throws NotYetConnectedException if the channel is not yet connected |
|
209 */ |
|
210 private void beginRead(boolean blocking) throws ClosedChannelException { |
|
211 if (blocking) { |
|
212 // set hook for Thread.interrupt |
|
213 begin(); |
|
214 } |
|
215 synchronized (stateLock) { |
|
216 if (!isOpen()) |
|
217 throw new ClosedChannelException(); |
|
218 if (blocking) |
|
219 thread = NativeThread.current(); |
|
220 } |
|
221 } |
|
222 |
|
223 /** |
|
224 * Marks the end of a read operation that may have blocked. |
|
225 * |
|
226 * @throws AsynchronousCloseException if the channel was closed due to this |
|
227 * thread being interrupted on a blocking read operation. |
|
228 */ |
|
229 private void endRead(boolean blocking, boolean completed) |
|
230 throws AsynchronousCloseException |
|
231 { |
|
232 if (blocking) { |
|
233 synchronized (stateLock) { |
|
234 thread = 0; |
|
235 // notify any thread waiting in implCloseSelectableChannel |
|
236 if (state == ST_CLOSING) { |
|
237 stateLock.notifyAll(); |
|
238 } |
|
239 } |
|
240 // remove hook for Thread.interrupt |
|
241 end(completed); |
|
242 } |
|
243 } |
|
244 |
|
245 @Override |
161 public int read(ByteBuffer dst) throws IOException { |
246 public int read(ByteBuffer dst) throws IOException { |
|
247 Objects.requireNonNull(dst); |
162 |
248 |
163 readLock.lock(); |
249 readLock.lock(); |
164 try { |
250 try { |
165 ensureOpen(); |
251 boolean blocking = isBlocking(); |
166 int n = 0; |
252 int n = 0; |
167 try { |
253 try { |
168 begin(); |
254 beginRead(blocking); |
169 if (!isOpen()) |
|
170 return 0; |
|
171 thread = NativeThread.current(); |
|
172 do { |
255 do { |
173 n = IOUtil.read(fd, dst, -1, nd); |
256 n = IOUtil.read(fd, dst, -1, nd); |
174 } while ((n == IOStatus.INTERRUPTED) && isOpen()); |
257 } while ((n == IOStatus.INTERRUPTED) && isOpen()); |
175 return IOStatus.normalize(n); |
|
176 } finally { |
258 } finally { |
177 thread = 0; |
259 endRead(blocking, n > 0); |
178 end((n > 0) || (n == IOStatus.UNAVAILABLE)); |
|
179 assert IOStatus.check(n); |
260 assert IOStatus.check(n); |
180 } |
261 } |
|
262 return IOStatus.normalize(n); |
181 } finally { |
263 } finally { |
182 readLock.unlock(); |
264 readLock.unlock(); |
183 } |
265 } |
184 } |
266 } |
185 |
267 |
186 public long read(ByteBuffer[] dsts, int offset, int length) |
268 @Override |
187 throws IOException |
269 public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { |
188 { |
270 Objects.checkFromIndexSize(offset, length, dsts.length); |
189 if ((offset < 0) || (length < 0) || (offset > dsts.length - length)) |
|
190 throw new IndexOutOfBoundsException(); |
|
191 return read(Util.subsequence(dsts, offset, length)); |
|
192 } |
|
193 |
|
194 public long read(ByteBuffer[] dsts) throws IOException { |
|
195 if (dsts == null) |
|
196 throw new NullPointerException(); |
|
197 |
271 |
198 readLock.lock(); |
272 readLock.lock(); |
199 try { |
273 try { |
200 ensureOpen(); |
274 boolean blocking = isBlocking(); |
201 long n = 0; |
275 long n = 0; |
202 try { |
276 try { |
203 begin(); |
277 beginRead(blocking); |
204 if (!isOpen()) |
|
205 return 0; |
|
206 thread = NativeThread.current(); |
|
207 do { |
278 do { |
208 n = IOUtil.read(fd, dsts, nd); |
279 n = IOUtil.read(fd, dsts, offset, length, nd); |
209 } while ((n == IOStatus.INTERRUPTED) && isOpen()); |
280 } while ((n == IOStatus.INTERRUPTED) && isOpen()); |
210 return IOStatus.normalize(n); |
|
211 } finally { |
281 } finally { |
212 thread = 0; |
282 endRead(blocking, n > 0); |
213 end((n > 0) || (n == IOStatus.UNAVAILABLE)); |
|
214 assert IOStatus.check(n); |
283 assert IOStatus.check(n); |
215 } |
284 } |
|
285 return IOStatus.normalize(n); |
216 } finally { |
286 } finally { |
217 readLock.unlock(); |
287 readLock.unlock(); |
218 } |
288 } |
219 } |
289 } |
|
290 |
|
291 @Override |
|
292 public long read(ByteBuffer[] dsts) throws IOException { |
|
293 return read(dsts, 0, dsts.length); |
|
294 } |
220 } |
295 } |