33 import java.nio.channels.NotYetConnectedException; |
33 import java.nio.channels.NotYetConnectedException; |
34 import java.nio.channels.Pipe; |
34 import java.nio.channels.Pipe; |
35 import java.nio.channels.SelectionKey; |
35 import java.nio.channels.SelectionKey; |
36 import java.nio.channels.spi.SelectorProvider; |
36 import java.nio.channels.spi.SelectorProvider; |
37 import java.util.Objects; |
37 import java.util.Objects; |
38 import java.util.concurrent.locks.Condition; |
|
39 import java.util.concurrent.locks.ReentrantLock; |
38 import java.util.concurrent.locks.ReentrantLock; |
40 |
39 |
41 class SinkChannelImpl |
40 class SinkChannelImpl |
42 extends Pipe.SinkChannel |
41 extends Pipe.SinkChannel |
43 implements SelChImpl |
42 implements SelChImpl |
52 // Lock held by current writing thread |
51 // Lock held by current writing thread |
53 private final ReentrantLock writeLock = new ReentrantLock(); |
52 private final ReentrantLock writeLock = new ReentrantLock(); |
54 |
53 |
55 // Lock held by any thread that modifies the state fields declared below |
54 // Lock held by any thread that modifies the state fields declared below |
56 // DO NOT invoke a blocking I/O operation while holding this lock! |
55 // DO NOT invoke a blocking I/O operation while holding this lock! |
57 private final ReentrantLock stateLock = new ReentrantLock(); |
56 private final Object stateLock = new Object(); |
58 private final Condition stateCondition = stateLock.newCondition(); |
|
59 |
57 |
60 // -- The following fields are protected by stateLock |
58 // -- The following fields are protected by stateLock |
61 |
59 |
62 // Channel state |
60 // Channel state |
63 private static final int ST_INUSE = 0; |
61 private static final int ST_INUSE = 0; |
64 private static final int ST_CLOSING = 1; |
62 private static final int ST_CLOSING = 1; |
65 private static final int ST_KILLPENDING = 2; |
63 private static final int ST_CLOSED = 2; |
66 private static final int ST_KILLED = 3; |
|
67 private int state; |
64 private int state; |
68 |
65 |
69 // ID of native thread doing write, for signalling |
66 // ID of native thread doing write, for signalling |
70 private long thread; |
67 private long thread; |
71 |
68 |
85 this.fd = fd; |
82 this.fd = fd; |
86 this.fdVal = IOUtil.fdVal(fd); |
83 this.fdVal = IOUtil.fdVal(fd); |
87 } |
84 } |
88 |
85 |
89 /** |
86 /** |
90 * Invoked by implCloseChannel to close the channel. |
87 * Closes the write end of the pipe if there are no write operation in |
91 */ |
88 * progress and the channel is not registered with a Selector. |
92 @Override |
89 */ |
93 protected void implCloseSelectableChannel() throws IOException { |
90 private boolean tryClose() throws IOException { |
94 assert !isOpen(); |
91 assert Thread.holdsLock(stateLock) && state == ST_CLOSING; |
95 |
92 if (thread == 0 && !isRegistered()) { |
96 boolean interrupted = false; |
93 state = ST_CLOSED; |
97 boolean blocking; |
94 nd.close(fd); |
98 |
95 return true; |
99 // set state to ST_CLOSING |
96 } else { |
100 stateLock.lock(); |
97 return false; |
|
98 } |
|
99 } |
|
100 |
|
101 /** |
|
102 * Invokes tryClose to attempt to close the write end of the pipe. |
|
103 * |
|
104 * This method is used for deferred closing by I/O and Selector operations. |
|
105 */ |
|
106 private void tryFinishClose() { |
101 try { |
107 try { |
|
108 tryClose(); |
|
109 } catch (IOException ignore) { } |
|
110 } |
|
111 |
|
112 /** |
|
113 * Closes this channel when configured in blocking mode. |
|
114 * |
|
115 * If there is a write operation in progress then the write-end of the pipe |
|
116 * is pre-closed and the writer is signalled, in which case the final close |
|
117 * is deferred until the writer aborts. |
|
118 */ |
|
119 private void implCloseBlockingMode() throws IOException { |
|
120 synchronized (stateLock) { |
102 assert state < ST_CLOSING; |
121 assert state < ST_CLOSING; |
103 state = ST_CLOSING; |
122 state = ST_CLOSING; |
104 blocking = isBlocking(); |
123 if (!tryClose()) { |
105 } finally { |
|
106 stateLock.unlock(); |
|
107 } |
|
108 |
|
109 // wait for any outstanding write to complete |
|
110 if (blocking) { |
|
111 stateLock.lock(); |
|
112 try { |
|
113 assert state == ST_CLOSING; |
|
114 long th = thread; |
124 long th = thread; |
115 if (th != 0) { |
125 if (th != 0) { |
116 nd.preClose(fd); |
126 nd.preClose(fd); |
117 NativeThread.signal(th); |
127 NativeThread.signal(th); |
118 |
|
119 // wait for write operation to end |
|
120 while (thread != 0) { |
|
121 try { |
|
122 stateCondition.await(); |
|
123 } catch (InterruptedException e) { |
|
124 interrupted = true; |
|
125 } |
|
126 } |
|
127 } |
128 } |
128 } finally { |
129 } |
129 stateLock.unlock(); |
130 } |
130 } |
131 } |
|
132 |
|
133 /** |
|
134 * Closes this channel when configured in non-blocking mode. |
|
135 * |
|
136 * If the channel is registered with a Selector then the close is deferred |
|
137 * until the channel is flushed from all Selectors. |
|
138 */ |
|
139 private void implCloseNonBlockingMode() throws IOException { |
|
140 synchronized (stateLock) { |
|
141 assert state < ST_CLOSING; |
|
142 state = ST_CLOSING; |
|
143 } |
|
144 // wait for any write operation to complete before trying to close |
|
145 writeLock.lock(); |
|
146 writeLock.unlock(); |
|
147 synchronized (stateLock) { |
|
148 if (state == ST_CLOSING) { |
|
149 tryClose(); |
|
150 } |
|
151 } |
|
152 } |
|
153 |
|
154 /** |
|
155 * Invoked by implCloseChannel to close the channel. |
|
156 */ |
|
157 @Override |
|
158 protected void implCloseSelectableChannel() throws IOException { |
|
159 assert !isOpen(); |
|
160 if (isBlocking()) { |
|
161 implCloseBlockingMode(); |
131 } else { |
162 } else { |
132 // non-blocking mode: wait for write to complete |
163 implCloseNonBlockingMode(); |
133 writeLock.lock(); |
164 } |
134 writeLock.unlock(); |
165 } |
135 } |
166 |
136 |
167 @Override |
137 // set state to ST_KILLPENDING |
168 public void kill() { |
138 stateLock.lock(); |
169 synchronized (stateLock) { |
139 try { |
170 if (state == ST_CLOSING) { |
140 assert state == ST_CLOSING; |
171 tryFinishClose(); |
141 state = ST_KILLPENDING; |
172 } |
142 } finally { |
|
143 stateLock.unlock(); |
|
144 } |
|
145 |
|
146 // close socket if not registered with Selector |
|
147 if (!isRegistered()) |
|
148 kill(); |
|
149 |
|
150 // restore interrupt status |
|
151 if (interrupted) |
|
152 Thread.currentThread().interrupt(); |
|
153 } |
|
154 |
|
155 @Override |
|
156 public void kill() throws IOException { |
|
157 stateLock.lock(); |
|
158 try { |
|
159 assert thread == 0; |
|
160 if (state == ST_KILLPENDING) { |
|
161 state = ST_KILLED; |
|
162 nd.close(fd); |
|
163 } |
|
164 } finally { |
|
165 stateLock.unlock(); |
|
166 } |
173 } |
167 } |
174 } |
168 |
175 |
169 @Override |
176 @Override |
170 protected void implConfigureBlocking(boolean block) throws IOException { |
177 protected void implConfigureBlocking(boolean block) throws IOException { |
171 writeLock.lock(); |
178 writeLock.lock(); |
172 try { |
179 try { |
173 stateLock.lock(); |
180 synchronized (stateLock) { |
174 try { |
181 if (!isOpen()) |
|
182 throw new ClosedChannelException(); |
175 IOUtil.configureBlocking(fd, block); |
183 IOUtil.configureBlocking(fd, block); |
176 } finally { |
|
177 stateLock.unlock(); |
|
178 } |
184 } |
179 } finally { |
185 } finally { |
180 writeLock.unlock(); |
186 writeLock.unlock(); |
181 } |
187 } |
182 } |
188 } |