36 |
36 |
37 class IOUtil { |
37 class IOUtil { |
38 |
38 |
39 private IOUtil() { } // No instantiation |
39 private IOUtil() { } // No instantiation |
40 |
40 |
41 /* |
|
42 * Returns the index of first buffer in bufs with remaining, |
|
43 * or -1 if there is nothing left |
|
44 */ |
|
45 private static int remaining(ByteBuffer[] bufs) { |
|
46 int numBufs = bufs.length; |
|
47 for (int i=0; i<numBufs; i++) { |
|
48 if (bufs[i].hasRemaining()) { |
|
49 return i; |
|
50 } |
|
51 } |
|
52 return -1; |
|
53 } |
|
54 |
|
55 /* |
|
56 * Returns a new ByteBuffer array with only unfinished buffers in it |
|
57 */ |
|
58 private static ByteBuffer[] skipBufs(ByteBuffer[] bufs, |
|
59 int nextWithRemaining) |
|
60 { |
|
61 int newSize = bufs.length - nextWithRemaining; |
|
62 ByteBuffer[] temp = new ByteBuffer[newSize]; |
|
63 for (int i=0; i<newSize; i++) { |
|
64 temp[i] = bufs[i + nextWithRemaining]; |
|
65 } |
|
66 return temp; |
|
67 } |
|
68 |
|
69 static int write(FileDescriptor fd, ByteBuffer src, long position, |
41 static int write(FileDescriptor fd, ByteBuffer src, long position, |
70 NativeDispatcher nd, Object lock) |
42 NativeDispatcher nd, Object lock) |
71 throws IOException |
43 throws IOException |
72 { |
44 { |
73 if (src instanceof DirectBuffer) |
45 if (src instanceof DirectBuffer) |
123 } |
95 } |
124 |
96 |
125 static long write(FileDescriptor fd, ByteBuffer[] bufs, NativeDispatcher nd) |
97 static long write(FileDescriptor fd, ByteBuffer[] bufs, NativeDispatcher nd) |
126 throws IOException |
98 throws IOException |
127 { |
99 { |
128 int nextWithRemaining = remaining(bufs); |
100 return write(fd, bufs, 0, bufs.length, nd); |
129 // if all bufs are empty we should return immediately |
101 } |
130 if (nextWithRemaining < 0) |
102 |
131 return 0; |
103 static long write(FileDescriptor fd, ByteBuffer[] bufs, int offset, int length, |
132 // If some bufs are empty we should skip them |
104 NativeDispatcher nd) |
133 if (nextWithRemaining > 0) |
105 throws IOException |
134 bufs = skipBufs(bufs, nextWithRemaining); |
106 { |
135 |
107 IOVecWrapper vec = IOVecWrapper.get(length); |
136 int numBufs = bufs.length; |
108 |
137 |
109 boolean completed = false; |
138 // Create shadow to ensure DirectByteBuffers are used |
110 int iov_len = 0; |
139 ByteBuffer[] shadow = new ByteBuffer[numBufs]; |
|
140 try { |
111 try { |
141 for (int i=0; i<numBufs; i++) { |
112 |
142 if (!(bufs[i] instanceof DirectBuffer)) { |
113 // Iterate over buffers to populate native iovec array. |
143 int pos = bufs[i].position(); |
114 int count = offset + length; |
144 int lim = bufs[i].limit(); |
115 for (int i=offset; i<count; i++) { |
145 assert (pos <= lim); |
116 ByteBuffer buf = bufs[i]; |
146 int rem = (pos <= lim ? lim - pos : 0); |
117 int pos = buf.position(); |
147 |
118 int lim = buf.limit(); |
148 ByteBuffer bb = Util.getTemporaryDirectBuffer(rem); |
119 assert (pos <= lim); |
149 shadow[i] = bb; |
120 int rem = (pos <= lim ? lim - pos : 0); |
150 // Leave slow buffer position untouched; it will be updated |
121 if (rem > 0) { |
151 // after we see how many bytes were really written out |
122 vec.setBuffer(iov_len, buf, pos, rem); |
152 bb.put(bufs[i]); |
123 |
153 bufs[i].position(pos); |
124 // allocate shadow buffer to ensure I/O is done with direct buffer |
154 bb.flip(); |
125 if (!(buf instanceof DirectBuffer)) { |
155 } else { |
126 ByteBuffer shadow = Util.getTemporaryDirectBuffer(rem); |
156 shadow[i] = bufs[i]; |
127 shadow.put(buf); |
157 } |
128 shadow.flip(); |
158 } |
129 vec.setShadow(iov_len, shadow); |
159 |
130 buf.position(pos); // temporarily restore position in user buffer |
160 IOVecWrapper vec = null; |
131 buf = shadow; |
161 long bytesWritten = 0; |
132 pos = shadow.position(); |
162 try { |
133 } |
163 // Create a native iovec array |
134 |
164 vec= new IOVecWrapper(numBufs); |
135 vec.putBase(iov_len, ((DirectBuffer)buf).address() + pos); |
165 |
136 vec.putLen(iov_len, rem); |
166 // Fill in the iovec array with appropriate data |
137 iov_len++; |
167 for (int i=0; i<numBufs; i++) { |
138 } |
168 ByteBuffer nextBuffer = shadow[i]; |
139 } |
169 // put in the buffer addresses |
140 if (iov_len == 0) |
170 long pos = nextBuffer.position(); |
141 return 0L; |
171 long len = nextBuffer.limit() - pos; |
142 |
172 vec.putBase(i, ((DirectBuffer)nextBuffer).address() + pos); |
143 long bytesWritten = nd.writev(fd, vec.address, iov_len); |
173 vec.putLen(i, len); |
|
174 } |
|
175 |
|
176 // Invoke native call to fill the buffers |
|
177 bytesWritten = nd.writev(fd, vec.address, numBufs); |
|
178 } finally { |
|
179 vec.free(); |
|
180 } |
|
181 long returnVal = bytesWritten; |
|
182 |
144 |
183 // Notify the buffers how many bytes were taken |
145 // Notify the buffers how many bytes were taken |
184 for (int i=0; i<numBufs; i++) { |
146 long left = bytesWritten; |
185 ByteBuffer nextBuffer = bufs[i]; |
147 for (int j=0; j<iov_len; j++) { |
186 int pos = nextBuffer.position(); |
148 if (left > 0) { |
187 int lim = nextBuffer.limit(); |
149 ByteBuffer buf = vec.getBuffer(j); |
188 assert (pos <= lim); |
150 int pos = vec.getPosition(j); |
189 int len = (pos <= lim ? lim - pos : lim); |
151 int rem = vec.getRemaining(j); |
190 if (bytesWritten >= len) { |
152 int n = (left > rem) ? rem : (int)left; |
191 bytesWritten -= len; |
153 buf.position(pos + n); |
192 int newPosition = pos + len; |
154 left -= n; |
193 nextBuffer.position(newPosition); |
155 } |
194 } else { // Buffers not completely filled |
156 // return shadow buffers to buffer pool |
195 if (bytesWritten > 0) { |
157 ByteBuffer shadow = vec.getShadow(j); |
196 assert(pos + bytesWritten < (long)Integer.MAX_VALUE); |
158 if (shadow != null) |
197 int newPosition = (int)(pos + bytesWritten); |
159 Util.offerLastTemporaryDirectBuffer(shadow); |
198 nextBuffer.position(newPosition); |
160 vec.clearRefs(j); |
199 } |
161 } |
200 break; |
162 |
201 } |
163 completed = true; |
202 } |
164 return bytesWritten; |
203 return returnVal; |
165 |
204 } finally { |
166 } finally { |
205 // return any substituted buffers to cache |
167 // if an error occurred then clear refs to buffers and return any shadow |
206 for (int i=0; i<numBufs; i++) { |
168 // buffers to cache |
207 ByteBuffer bb = shadow[i]; |
169 if (!completed) { |
208 if (bb != null && bb != bufs[i]) { |
170 for (int j=0; j<iov_len; j++) { |
209 Util.releaseTemporaryDirectBuffer(bb); |
171 ByteBuffer shadow = vec.getShadow(j); |
|
172 if (shadow != null) |
|
173 Util.offerLastTemporaryDirectBuffer(shadow); |
|
174 vec.clearRefs(j); |
210 } |
175 } |
211 } |
176 } |
212 } |
177 } |
213 } |
178 } |
214 |
179 |
260 } |
225 } |
261 |
226 |
262 static long read(FileDescriptor fd, ByteBuffer[] bufs, NativeDispatcher nd) |
227 static long read(FileDescriptor fd, ByteBuffer[] bufs, NativeDispatcher nd) |
263 throws IOException |
228 throws IOException |
264 { |
229 { |
265 int nextWithRemaining = remaining(bufs); |
230 return read(fd, bufs, 0, bufs.length, nd); |
266 // if all bufs are empty we should return immediately |
231 } |
267 if (nextWithRemaining < 0) |
232 |
268 return 0; |
233 static long read(FileDescriptor fd, ByteBuffer[] bufs, int offset, int length, |
269 // If some bufs are empty we should skip them |
234 NativeDispatcher nd) |
270 if (nextWithRemaining > 0) |
235 throws IOException |
271 bufs = skipBufs(bufs, nextWithRemaining); |
236 { |
272 |
237 IOVecWrapper vec = IOVecWrapper.get(length); |
273 int numBufs = bufs.length; |
238 |
274 |
239 boolean completed = false; |
275 // Read into the shadow to ensure DirectByteBuffers are used |
240 int iov_len = 0; |
276 ByteBuffer[] shadow = new ByteBuffer[numBufs]; |
|
277 boolean usingSlowBuffers = false; |
|
278 try { |
241 try { |
279 for (int i=0; i<numBufs; i++) { |
242 |
280 if (bufs[i].isReadOnly()) |
243 // Iterate over buffers to populate native iovec array. |
|
244 int count = offset + length; |
|
245 for (int i=offset; i<count; i++) { |
|
246 ByteBuffer buf = bufs[i]; |
|
247 if (buf.isReadOnly()) |
281 throw new IllegalArgumentException("Read-only buffer"); |
248 throw new IllegalArgumentException("Read-only buffer"); |
282 if (!(bufs[i] instanceof DirectBuffer)) { |
249 int pos = buf.position(); |
283 shadow[i] = Util.getTemporaryDirectBuffer(bufs[i].remaining()); |
250 int lim = buf.limit(); |
284 usingSlowBuffers = true; |
251 assert (pos <= lim); |
285 } else { |
252 int rem = (pos <= lim ? lim - pos : 0); |
286 shadow[i] = bufs[i]; |
253 |
287 } |
254 if (rem > 0) { |
288 } |
255 vec.setBuffer(iov_len, buf, pos, rem); |
289 |
256 |
290 IOVecWrapper vec = null; |
257 // allocate shadow buffer to ensure I/O is done with direct buffer |
291 long bytesRead = 0; |
258 if (!(buf instanceof DirectBuffer)) { |
292 try { |
259 ByteBuffer shadow = Util.getTemporaryDirectBuffer(rem); |
293 // Create a native iovec array |
260 vec.setShadow(iov_len, shadow); |
294 vec = new IOVecWrapper(numBufs); |
261 buf = shadow; |
295 |
262 pos = shadow.position(); |
296 // Fill in the iovec array with appropriate data |
263 } |
297 for (int i=0; i<numBufs; i++) { |
264 |
298 ByteBuffer nextBuffer = shadow[i]; |
265 vec.putBase(iov_len, ((DirectBuffer)buf).address() + pos); |
299 // put in the buffer addresses |
266 vec.putLen(iov_len, rem); |
300 long pos = nextBuffer.position(); |
267 iov_len++; |
301 long len = nextBuffer.remaining(); |
268 } |
302 vec.putBase(i, ((DirectBuffer)nextBuffer).address() + pos); |
269 } |
303 vec.putLen(i, len); |
270 if (iov_len == 0) |
304 } |
271 return 0L; |
305 |
272 |
306 // Invoke native call to fill the buffers |
273 long bytesRead = nd.readv(fd, vec.address, iov_len); |
307 bytesRead = nd.readv(fd, vec.address, numBufs); |
|
308 } finally { |
|
309 vec.free(); |
|
310 } |
|
311 long returnVal = bytesRead; |
|
312 |
274 |
313 // Notify the buffers how many bytes were read |
275 // Notify the buffers how many bytes were read |
314 for (int i=0; i<numBufs; i++) { |
276 long left = bytesRead; |
315 ByteBuffer nextBuffer = shadow[i]; |
277 for (int j=0; j<iov_len; j++) { |
316 // Note: should this have been cached from above? |
278 ByteBuffer shadow = vec.getShadow(j); |
317 int pos = nextBuffer.position(); |
279 if (left > 0) { |
318 int len = nextBuffer.remaining(); |
280 ByteBuffer buf = vec.getBuffer(j); |
319 if (bytesRead >= len) { |
281 int rem = vec.getRemaining(j); |
320 bytesRead -= len; |
282 int n = (left > rem) ? rem : (int)left; |
321 int newPosition = pos + len; |
283 if (shadow == null) { |
322 nextBuffer.position(newPosition); |
284 int pos = vec.getPosition(j); |
323 } else { // Buffers not completely filled |
285 buf.position(pos + n); |
324 if (bytesRead > 0) { |
286 } else { |
325 assert(pos + bytesRead < (long)Integer.MAX_VALUE); |
287 shadow.limit(shadow.position() + n); |
326 int newPosition = (int)(pos + bytesRead); |
288 buf.put(shadow); |
327 nextBuffer.position(newPosition); |
|
328 } |
289 } |
329 break; |
290 left -= n; |
330 } |
291 } |
331 } |
292 if (shadow != null) |
332 |
293 Util.offerLastTemporaryDirectBuffer(shadow); |
333 // Put results from shadow into the slow buffers |
294 vec.clearRefs(j); |
334 if (usingSlowBuffers) { |
295 } |
335 for (int i=0; i<numBufs; i++) { |
296 |
336 if (!(bufs[i] instanceof DirectBuffer)) { |
297 completed = true; |
337 shadow[i].flip(); |
298 return bytesRead; |
338 bufs[i].put(shadow[i]); |
299 |
339 } |
|
340 } |
|
341 } |
|
342 return returnVal; |
|
343 } finally { |
300 } finally { |
344 // return any substituted buffers to cache |
301 // if an error occurred then clear refs to buffers and return any shadow |
345 if (usingSlowBuffers) { |
302 // buffers to cache |
346 for (int i=0; i<numBufs; i++) { |
303 if (!completed) { |
347 ByteBuffer bb = shadow[i]; |
304 for (int j=0; j<iov_len; j++) { |
348 if (bb != null && bb != bufs[i]) { |
305 ByteBuffer shadow = vec.getShadow(j); |
349 Util.releaseTemporaryDirectBuffer(bb); |
306 if (shadow != null) |
350 } |
307 Util.offerLastTemporaryDirectBuffer(shadow); |
|
308 vec.clearRefs(j); |
351 } |
309 } |
352 } |
310 } |
353 } |
311 } |
354 } |
312 } |
355 |
313 |