136 // If some bufs are empty we should skip them |
132 // If some bufs are empty we should skip them |
137 if (nextWithRemaining > 0) |
133 if (nextWithRemaining > 0) |
138 bufs = skipBufs(bufs, nextWithRemaining); |
134 bufs = skipBufs(bufs, nextWithRemaining); |
139 |
135 |
140 int numBufs = bufs.length; |
136 int numBufs = bufs.length; |
141 int bytesReadyToWrite = 0; |
|
142 |
137 |
143 // Create shadow to ensure DirectByteBuffers are used |
138 // Create shadow to ensure DirectByteBuffers are used |
144 ByteBuffer[] shadow = new ByteBuffer[numBufs]; |
139 ByteBuffer[] shadow = new ByteBuffer[numBufs]; |
145 for (int i=0; i<numBufs; i++) { |
140 try { |
146 if (!(bufs[i] instanceof DirectBuffer)) { |
141 for (int i=0; i<numBufs; i++) { |
147 int pos = bufs[i].position(); |
142 if (!(bufs[i] instanceof DirectBuffer)) { |
148 int lim = bufs[i].limit(); |
143 int pos = bufs[i].position(); |
|
144 int lim = bufs[i].limit(); |
|
145 assert (pos <= lim); |
|
146 int rem = (pos <= lim ? lim - pos : 0); |
|
147 |
|
148 ByteBuffer bb = Util.getTemporaryDirectBuffer(rem); |
|
149 shadow[i] = bb; |
|
150 // Leave slow buffer position untouched; it will be updated |
|
151 // after we see how many bytes were really written out |
|
152 bb.put(bufs[i]); |
|
153 bufs[i].position(pos); |
|
154 bb.flip(); |
|
155 } else { |
|
156 shadow[i] = bufs[i]; |
|
157 } |
|
158 } |
|
159 |
|
160 IOVecWrapper vec = null; |
|
161 long bytesWritten = 0; |
|
162 try { |
|
163 // Create a native iovec array |
|
164 vec= new IOVecWrapper(numBufs); |
|
165 |
|
166 // Fill in the iovec array with appropriate data |
|
167 for (int i=0; i<numBufs; i++) { |
|
168 ByteBuffer nextBuffer = shadow[i]; |
|
169 // put in the buffer addresses |
|
170 long pos = nextBuffer.position(); |
|
171 long len = nextBuffer.limit() - pos; |
|
172 vec.putBase(i, ((DirectBuffer)nextBuffer).address() + pos); |
|
173 vec.putLen(i, len); |
|
174 } |
|
175 |
|
176 // Invoke native call to fill the buffers |
|
177 bytesWritten = nd.writev(fd, vec.address, numBufs); |
|
178 } finally { |
|
179 vec.free(); |
|
180 } |
|
181 long returnVal = bytesWritten; |
|
182 |
|
183 // Notify the buffers how many bytes were taken |
|
184 for (int i=0; i<numBufs; i++) { |
|
185 ByteBuffer nextBuffer = bufs[i]; |
|
186 int pos = nextBuffer.position(); |
|
187 int lim = nextBuffer.limit(); |
149 assert (pos <= lim); |
188 assert (pos <= lim); |
150 int rem = (pos <= lim ? lim - pos : 0); |
189 int len = (pos <= lim ? lim - pos : lim); |
151 |
190 if (bytesWritten >= len) { |
152 ByteBuffer bb = ByteBuffer.allocateDirect(rem); |
191 bytesWritten -= len; |
153 shadow[i] = bb; |
192 int newPosition = pos + len; |
154 // Leave slow buffer position untouched; it will be updated |
193 nextBuffer.position(newPosition); |
155 // after we see how many bytes were really written out |
194 } else { // Buffers not completely filled |
156 bb.put(bufs[i]); |
195 if (bytesWritten > 0) { |
157 bufs[i].position(pos); |
196 assert(pos + bytesWritten < (long)Integer.MAX_VALUE); |
158 bb.flip(); |
197 int newPosition = (int)(pos + bytesWritten); |
159 } else { |
198 nextBuffer.position(newPosition); |
160 shadow[i] = bufs[i]; |
199 } |
161 } |
200 break; |
162 } |
201 } |
163 |
202 } |
164 IOVecWrapper vec = null; |
203 return returnVal; |
165 long bytesWritten = 0; |
|
166 try { |
|
167 // Create a native iovec array |
|
168 vec= new IOVecWrapper(numBufs); |
|
169 |
|
170 // Fill in the iovec array with appropriate data |
|
171 for (int i=0; i<numBufs; i++) { |
|
172 ByteBuffer nextBuffer = shadow[i]; |
|
173 // put in the buffer addresses |
|
174 long pos = nextBuffer.position(); |
|
175 long len = nextBuffer.limit() - pos; |
|
176 bytesReadyToWrite += len; |
|
177 vec.putBase(i, ((DirectBuffer)nextBuffer).address() + pos); |
|
178 vec.putLen(i, len); |
|
179 } |
|
180 |
|
181 // Invoke native call to fill the buffers |
|
182 bytesWritten = nd.writev(fd, vec.address, numBufs); |
|
183 } finally { |
204 } finally { |
184 vec.free(); |
205 // return any substituted buffers to cache |
185 } |
206 for (int i=0; i<numBufs; i++) { |
186 long returnVal = bytesWritten; |
207 ByteBuffer bb = shadow[i]; |
187 |
208 if (bb != null && bb != bufs[i]) { |
188 // Notify the buffers how many bytes were taken |
209 Util.releaseTemporaryDirectBuffer(bb); |
189 for (int i=0; i<numBufs; i++) { |
210 } |
190 ByteBuffer nextBuffer = bufs[i]; |
211 } |
191 int pos = nextBuffer.position(); |
212 } |
192 int lim = nextBuffer.limit(); |
|
193 assert (pos <= lim); |
|
194 int len = (pos <= lim ? lim - pos : lim); |
|
195 if (bytesWritten >= len) { |
|
196 bytesWritten -= len; |
|
197 int newPosition = pos + len; |
|
198 nextBuffer.position(newPosition); |
|
199 } else { // Buffers not completely filled |
|
200 if (bytesWritten > 0) { |
|
201 assert(pos + bytesWritten < (long)Integer.MAX_VALUE); |
|
202 int newPosition = (int)(pos + bytesWritten); |
|
203 nextBuffer.position(newPosition); |
|
204 } |
|
205 break; |
|
206 } |
|
207 } |
|
208 return returnVal; |
|
209 } |
213 } |
210 |
214 |
211 static int read(FileDescriptor fd, ByteBuffer dst, long position, |
215 static int read(FileDescriptor fd, ByteBuffer dst, long position, |
212 NativeDispatcher nd, Object lock) |
216 NativeDispatcher nd, Object lock) |
213 throws IOException |
217 throws IOException |
268 |
272 |
269 int numBufs = bufs.length; |
273 int numBufs = bufs.length; |
270 |
274 |
271 // Read into the shadow to ensure DirectByteBuffers are used |
275 // Read into the shadow to ensure DirectByteBuffers are used |
272 ByteBuffer[] shadow = new ByteBuffer[numBufs]; |
276 ByteBuffer[] shadow = new ByteBuffer[numBufs]; |
273 for (int i=0; i<numBufs; i++) { |
277 boolean usingSlowBuffers = false; |
274 if (bufs[i].isReadOnly()) |
|
275 throw new IllegalArgumentException("Read-only buffer"); |
|
276 if (!(bufs[i] instanceof DirectBuffer)) { |
|
277 shadow[i] = ByteBuffer.allocateDirect(bufs[i].remaining()); |
|
278 } else { |
|
279 shadow[i] = bufs[i]; |
|
280 } |
|
281 } |
|
282 |
|
283 IOVecWrapper vec = null; |
|
284 long bytesRead = 0; |
|
285 try { |
278 try { |
286 // Create a native iovec array |
279 for (int i=0; i<numBufs; i++) { |
287 vec = new IOVecWrapper(numBufs); |
280 if (bufs[i].isReadOnly()) |
288 |
281 throw new IllegalArgumentException("Read-only buffer"); |
289 // Fill in the iovec array with appropriate data |
282 if (!(bufs[i] instanceof DirectBuffer)) { |
|
283 shadow[i] = Util.getTemporaryDirectBuffer(bufs[i].remaining()); |
|
284 usingSlowBuffers = true; |
|
285 } else { |
|
286 shadow[i] = bufs[i]; |
|
287 } |
|
288 } |
|
289 |
|
290 IOVecWrapper vec = null; |
|
291 long bytesRead = 0; |
|
292 try { |
|
293 // Create a native iovec array |
|
294 vec = new IOVecWrapper(numBufs); |
|
295 |
|
296 // Fill in the iovec array with appropriate data |
|
297 for (int i=0; i<numBufs; i++) { |
|
298 ByteBuffer nextBuffer = shadow[i]; |
|
299 // put in the buffer addresses |
|
300 long pos = nextBuffer.position(); |
|
301 long len = nextBuffer.remaining(); |
|
302 vec.putBase(i, ((DirectBuffer)nextBuffer).address() + pos); |
|
303 vec.putLen(i, len); |
|
304 } |
|
305 |
|
306 // Invoke native call to fill the buffers |
|
307 bytesRead = nd.readv(fd, vec.address, numBufs); |
|
308 } finally { |
|
309 vec.free(); |
|
310 } |
|
311 long returnVal = bytesRead; |
|
312 |
|
313 // Notify the buffers how many bytes were read |
290 for (int i=0; i<numBufs; i++) { |
314 for (int i=0; i<numBufs; i++) { |
291 ByteBuffer nextBuffer = shadow[i]; |
315 ByteBuffer nextBuffer = shadow[i]; |
292 // put in the buffer addresses |
316 // Note: should this have been cached from above? |
293 long pos = nextBuffer.position(); |
317 int pos = nextBuffer.position(); |
294 long len = nextBuffer.remaining(); |
318 int len = nextBuffer.remaining(); |
295 vec.putBase(i, ((DirectBuffer)nextBuffer).address() + pos); |
319 if (bytesRead >= len) { |
296 vec.putLen(i, len); |
320 bytesRead -= len; |
297 } |
321 int newPosition = pos + len; |
298 |
322 nextBuffer.position(newPosition); |
299 // Invoke native call to fill the buffers |
323 } else { // Buffers not completely filled |
300 bytesRead = nd.readv(fd, vec.address, numBufs); |
324 if (bytesRead > 0) { |
|
325 assert(pos + bytesRead < (long)Integer.MAX_VALUE); |
|
326 int newPosition = (int)(pos + bytesRead); |
|
327 nextBuffer.position(newPosition); |
|
328 } |
|
329 break; |
|
330 } |
|
331 } |
|
332 |
|
333 // Put results from shadow into the slow buffers |
|
334 if (usingSlowBuffers) { |
|
335 for (int i=0; i<numBufs; i++) { |
|
336 if (!(bufs[i] instanceof DirectBuffer)) { |
|
337 shadow[i].flip(); |
|
338 bufs[i].put(shadow[i]); |
|
339 } |
|
340 } |
|
341 } |
|
342 return returnVal; |
301 } finally { |
343 } finally { |
302 vec.free(); |
344 // return any substituted buffers to cache |
303 } |
345 if (usingSlowBuffers) { |
304 long returnVal = bytesRead; |
346 for (int i=0; i<numBufs; i++) { |
305 |
347 ByteBuffer bb = shadow[i]; |
306 // Notify the buffers how many bytes were read |
348 if (bb != null && bb != bufs[i]) { |
307 for (int i=0; i<numBufs; i++) { |
349 Util.releaseTemporaryDirectBuffer(bb); |
308 ByteBuffer nextBuffer = shadow[i]; |
350 } |
309 // Note: should this have been cached from above? |
351 } |
310 int pos = nextBuffer.position(); |
352 } |
311 int len = nextBuffer.remaining(); |
353 } |
312 if (bytesRead >= len) { |
|
313 bytesRead -= len; |
|
314 int newPosition = pos + len; |
|
315 nextBuffer.position(newPosition); |
|
316 } else { // Buffers not completely filled |
|
317 if (bytesRead > 0) { |
|
318 assert(pos + bytesRead < (long)Integer.MAX_VALUE); |
|
319 int newPosition = (int)(pos + bytesRead); |
|
320 nextBuffer.position(newPosition); |
|
321 } |
|
322 break; |
|
323 } |
|
324 } |
|
325 |
|
326 // Put results from shadow into the slow buffers |
|
327 for (int i=0; i<numBufs; i++) { |
|
328 if (!(bufs[i] instanceof DirectBuffer)) { |
|
329 shadow[i].flip(); |
|
330 bufs[i].put(shadow[i]); |
|
331 } |
|
332 } |
|
333 |
|
334 return returnVal; |
|
335 } |
354 } |
336 |
355 |
337 static FileDescriptor newFD(int i) { |
356 static FileDescriptor newFD(int i) { |
338 FileDescriptor fd = new FileDescriptor(); |
357 FileDescriptor fd = new FileDescriptor(); |
339 setfdVal(fd, i); |
358 setfdVal(fd, i); |