--- a/jdk/src/share/classes/sun/nio/ch/DatagramChannelImpl.java Thu Aug 12 16:36:49 2010 -0700
+++ b/jdk/src/share/classes/sun/nio/ch/DatagramChannelImpl.java Thu Aug 12 16:47:13 2010 -0700
@@ -536,9 +536,11 @@
}
}
- private long read0(ByteBuffer[] bufs) throws IOException {
- if (bufs == null)
- throw new NullPointerException();
+ public long read(ByteBuffer[] dsts, int offset, int length)
+ throws IOException
+ {
+ if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
+ throw new IndexOutOfBoundsException();
synchronized (readLock) {
synchronized (stateLock) {
ensureOpen();
@@ -552,7 +554,7 @@
return 0;
readerThread = NativeThread.current();
do {
- n = IOUtil.read(fd, bufs, nd);
+ n = IOUtil.read(fd, dsts, offset, length, nd);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n);
} finally {
@@ -563,15 +565,6 @@
}
}
- public long read(ByteBuffer[] dsts, int offset, int length)
- throws IOException
- {
- if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
- throw new IndexOutOfBoundsException();
- // ## Fix IOUtil.write so that we can avoid this array copy
- return read0(Util.subsequence(dsts, offset, length));
- }
-
public int write(ByteBuffer buf) throws IOException {
if (buf == null)
throw new NullPointerException();
@@ -599,9 +592,11 @@
}
}
- private long write0(ByteBuffer[] bufs) throws IOException {
- if (bufs == null)
- throw new NullPointerException();
+ public long write(ByteBuffer[] srcs, int offset, int length)
+ throws IOException
+ {
+ if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
+ throw new IndexOutOfBoundsException();
synchronized (writeLock) {
synchronized (stateLock) {
ensureOpen();
@@ -615,7 +610,7 @@
return 0;
writerThread = NativeThread.current();
do {
- n = IOUtil.write(fd, bufs, nd);
+ n = IOUtil.write(fd, srcs, offset, length, nd);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n);
} finally {
@@ -626,15 +621,6 @@
}
}
- public long write(ByteBuffer[] srcs, int offset, int length)
- throws IOException
- {
- if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
- throw new IndexOutOfBoundsException();
- // ## Fix IOUtil.write so that we can avoid this array copy
- return write0(Util.subsequence(srcs, offset, length));
- }
-
protected void implConfigureBlocking(boolean block) throws IOException {
IOUtil.configureBlocking(fd, block);
}
--- a/jdk/src/share/classes/sun/nio/ch/FileChannelImpl.java Thu Aug 12 16:36:49 2010 -0700
+++ b/jdk/src/share/classes/sun/nio/ch/FileChannelImpl.java Thu Aug 12 16:47:13 2010 -0700
@@ -143,7 +143,11 @@
}
}
- private long read0(ByteBuffer[] dsts) throws IOException {
+ public long read(ByteBuffer[] dsts, int offset, int length)
+ throws IOException
+ {
+ if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
+ throw new IndexOutOfBoundsException();
ensureOpen();
if (!readable)
throw new NonReadableChannelException();
@@ -156,7 +160,7 @@
if (!isOpen())
return 0;
do {
- n = IOUtil.read(fd, dsts, nd);
+ n = IOUtil.read(fd, dsts, offset, length, nd);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n);
} finally {
@@ -167,15 +171,6 @@
}
}
- public long read(ByteBuffer[] dsts, int offset, int length)
- throws IOException
- {
- if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
- throw new IndexOutOfBoundsException();
- // ## Fix IOUtil.write so that we can avoid this array copy
- return read0(Util.subsequence(dsts, offset, length));
- }
-
public int write(ByteBuffer src) throws IOException {
ensureOpen();
if (!writable)
@@ -200,7 +195,11 @@
}
}
- private long write0(ByteBuffer[] srcs) throws IOException {
+ public long write(ByteBuffer[] srcs, int offset, int length)
+ throws IOException
+ {
+ if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
+ throw new IndexOutOfBoundsException();
ensureOpen();
if (!writable)
throw new NonWritableChannelException();
@@ -213,7 +212,7 @@
if (!isOpen())
return 0;
do {
- n = IOUtil.write(fd, srcs, nd);
+ n = IOUtil.write(fd, srcs, offset, length, nd);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n);
} finally {
@@ -224,16 +223,6 @@
}
}
- public long write(ByteBuffer[] srcs, int offset, int length)
- throws IOException
- {
- if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
- throw new IndexOutOfBoundsException();
- // ## Fix IOUtil.write so that we can avoid this array copy
- return write0(Util.subsequence(srcs, offset, length));
- }
-
-
// -- Other operations --
public long position() throws IOException {
--- a/jdk/src/share/classes/sun/nio/ch/IOUtil.java Thu Aug 12 16:36:49 2010 -0700
+++ b/jdk/src/share/classes/sun/nio/ch/IOUtil.java Thu Aug 12 16:47:13 2010 -0700
@@ -38,34 +38,6 @@
private IOUtil() { } // No instantiation
- /*
- * Returns the index of first buffer in bufs with remaining,
- * or -1 if there is nothing left
- */
- private static int remaining(ByteBuffer[] bufs) {
- int numBufs = bufs.length;
- for (int i=0; i<numBufs; i++) {
- if (bufs[i].hasRemaining()) {
- return i;
- }
- }
- return -1;
- }
-
- /*
- * Returns a new ByteBuffer array with only unfinished buffers in it
- */
- private static ByteBuffer[] skipBufs(ByteBuffer[] bufs,
- int nextWithRemaining)
- {
- int newSize = bufs.length - nextWithRemaining;
- ByteBuffer[] temp = new ByteBuffer[newSize];
- for (int i=0; i<newSize; i++) {
- temp[i] = bufs[i + nextWithRemaining];
- }
- return temp;
- }
-
static int write(FileDescriptor fd, ByteBuffer src, long position,
NativeDispatcher nd, Object lock)
throws IOException
@@ -93,7 +65,7 @@
}
return n;
} finally {
- Util.releaseTemporaryDirectBuffer(bb);
+ Util.offerFirstTemporaryDirectBuffer(bb);
}
}
@@ -125,88 +97,81 @@
static long write(FileDescriptor fd, ByteBuffer[] bufs, NativeDispatcher nd)
throws IOException
{
- int nextWithRemaining = remaining(bufs);
- // if all bufs are empty we should return immediately
- if (nextWithRemaining < 0)
- return 0;
- // If some bufs are empty we should skip them
- if (nextWithRemaining > 0)
- bufs = skipBufs(bufs, nextWithRemaining);
+ return write(fd, bufs, 0, bufs.length, nd);
+ }
- int numBufs = bufs.length;
+ static long write(FileDescriptor fd, ByteBuffer[] bufs, int offset, int length,
+ NativeDispatcher nd)
+ throws IOException
+ {
+ IOVecWrapper vec = IOVecWrapper.get(length);
- // Create shadow to ensure DirectByteBuffers are used
- ByteBuffer[] shadow = new ByteBuffer[numBufs];
+ boolean completed = false;
+ int iov_len = 0;
try {
- for (int i=0; i<numBufs; i++) {
- if (!(bufs[i] instanceof DirectBuffer)) {
- int pos = bufs[i].position();
- int lim = bufs[i].limit();
- assert (pos <= lim);
- int rem = (pos <= lim ? lim - pos : 0);
+
+ // Iterate over buffers to populate native iovec array.
+ int count = offset + length;
+ for (int i=offset; i<count; i++) {
+ ByteBuffer buf = bufs[i];
+ int pos = buf.position();
+ int lim = buf.limit();
+ assert (pos <= lim);
+ int rem = (pos <= lim ? lim - pos : 0);
+ if (rem > 0) {
+ vec.setBuffer(iov_len, buf, pos, rem);
+
+ // allocate shadow buffer to ensure I/O is done with direct buffer
+ if (!(buf instanceof DirectBuffer)) {
+ ByteBuffer shadow = Util.getTemporaryDirectBuffer(rem);
+ shadow.put(buf);
+ shadow.flip();
+ vec.setShadow(iov_len, shadow);
+ buf.position(pos); // temporarily restore position in user buffer
+ buf = shadow;
+ pos = shadow.position();
+ }
- ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
- shadow[i] = bb;
- // Leave slow buffer position untouched; it will be updated
- // after we see how many bytes were really written out
- bb.put(bufs[i]);
- bufs[i].position(pos);
- bb.flip();
- } else {
- shadow[i] = bufs[i];
+ vec.putBase(iov_len, ((DirectBuffer)buf).address() + pos);
+ vec.putLen(iov_len, rem);
+ iov_len++;
}
}
+ if (iov_len == 0)
+ return 0L;
+
+ long bytesWritten = nd.writev(fd, vec.address, iov_len);
+
+ // Notify the buffers how many bytes were taken
+ long left = bytesWritten;
+ for (int j=0; j<iov_len; j++) {
+ if (left > 0) {
+ ByteBuffer buf = vec.getBuffer(j);
+ int pos = vec.getPosition(j);
+ int rem = vec.getRemaining(j);
+ int n = (left > rem) ? rem : (int)left;
+ buf.position(pos + n);
+ left -= n;
+ }
+ // return shadow buffers to buffer pool
+ ByteBuffer shadow = vec.getShadow(j);
+ if (shadow != null)
+ Util.offerLastTemporaryDirectBuffer(shadow);
+ vec.clearRefs(j);
+ }
- IOVecWrapper vec = null;
- long bytesWritten = 0;
- try {
- // Create a native iovec array
- vec= new IOVecWrapper(numBufs);
-
- // Fill in the iovec array with appropriate data
- for (int i=0; i<numBufs; i++) {
- ByteBuffer nextBuffer = shadow[i];
- // put in the buffer addresses
- long pos = nextBuffer.position();
- long len = nextBuffer.limit() - pos;
- vec.putBase(i, ((DirectBuffer)nextBuffer).address() + pos);
- vec.putLen(i, len);
- }
-
- // Invoke native call to fill the buffers
- bytesWritten = nd.writev(fd, vec.address, numBufs);
- } finally {
- vec.free();
- }
- long returnVal = bytesWritten;
+ completed = true;
+ return bytesWritten;
- // Notify the buffers how many bytes were taken
- for (int i=0; i<numBufs; i++) {
- ByteBuffer nextBuffer = bufs[i];
- int pos = nextBuffer.position();
- int lim = nextBuffer.limit();
- assert (pos <= lim);
- int len = (pos <= lim ? lim - pos : lim);
- if (bytesWritten >= len) {
- bytesWritten -= len;
- int newPosition = pos + len;
- nextBuffer.position(newPosition);
- } else { // Buffers not completely filled
- if (bytesWritten > 0) {
- assert(pos + bytesWritten < (long)Integer.MAX_VALUE);
- int newPosition = (int)(pos + bytesWritten);
- nextBuffer.position(newPosition);
- }
- break;
- }
- }
- return returnVal;
} finally {
- // return any substituted buffers to cache
- for (int i=0; i<numBufs; i++) {
- ByteBuffer bb = shadow[i];
- if (bb != null && bb != bufs[i]) {
- Util.releaseTemporaryDirectBuffer(bb);
+ // if an error occurred then clear refs to buffers and return any shadow
+ // buffers to cache
+ if (!completed) {
+ for (int j=0; j<iov_len; j++) {
+ ByteBuffer shadow = vec.getShadow(j);
+ if (shadow != null)
+ Util.offerLastTemporaryDirectBuffer(shadow);
+ vec.clearRefs(j);
}
}
}
@@ -231,7 +196,7 @@
dst.put(bb);
return n;
} finally {
- Util.releaseTemporaryDirectBuffer(bb);
+ Util.offerFirstTemporaryDirectBuffer(bb);
}
}
@@ -262,92 +227,85 @@
static long read(FileDescriptor fd, ByteBuffer[] bufs, NativeDispatcher nd)
throws IOException
{
- int nextWithRemaining = remaining(bufs);
- // if all bufs are empty we should return immediately
- if (nextWithRemaining < 0)
- return 0;
- // If some bufs are empty we should skip them
- if (nextWithRemaining > 0)
- bufs = skipBufs(bufs, nextWithRemaining);
+ return read(fd, bufs, 0, bufs.length, nd);
+ }
+
+ static long read(FileDescriptor fd, ByteBuffer[] bufs, int offset, int length,
+ NativeDispatcher nd)
+ throws IOException
+ {
+ IOVecWrapper vec = IOVecWrapper.get(length);
+
+ boolean completed = false;
+ int iov_len = 0;
+ try {
- int numBufs = bufs.length;
+ // Iterate over buffers to populate native iovec array.
+ int count = offset + length;
+ for (int i=offset; i<count; i++) {
+ ByteBuffer buf = bufs[i];
+ if (buf.isReadOnly())
+ throw new IllegalArgumentException("Read-only buffer");
+ int pos = buf.position();
+ int lim = buf.limit();
+ assert (pos <= lim);
+ int rem = (pos <= lim ? lim - pos : 0);
+
+ if (rem > 0) {
+ vec.setBuffer(iov_len, buf, pos, rem);
- // Read into the shadow to ensure DirectByteBuffers are used
- ByteBuffer[] shadow = new ByteBuffer[numBufs];
- boolean usingSlowBuffers = false;
- try {
- for (int i=0; i<numBufs; i++) {
- if (bufs[i].isReadOnly())
- throw new IllegalArgumentException("Read-only buffer");
- if (!(bufs[i] instanceof DirectBuffer)) {
- shadow[i] = Util.getTemporaryDirectBuffer(bufs[i].remaining());
- usingSlowBuffers = true;
- } else {
- shadow[i] = bufs[i];
+ // allocate shadow buffer to ensure I/O is done with direct buffer
+ if (!(buf instanceof DirectBuffer)) {
+ ByteBuffer shadow = Util.getTemporaryDirectBuffer(rem);
+ vec.setShadow(iov_len, shadow);
+ buf = shadow;
+ pos = shadow.position();
+ }
+
+ vec.putBase(iov_len, ((DirectBuffer)buf).address() + pos);
+ vec.putLen(iov_len, rem);
+ iov_len++;
}
}
+ if (iov_len == 0)
+ return 0L;
+
+ long bytesRead = nd.readv(fd, vec.address, iov_len);
+
+ // Notify the buffers how many bytes were read
+ long left = bytesRead;
+ for (int j=0; j<iov_len; j++) {
+ ByteBuffer shadow = vec.getShadow(j);
+ if (left > 0) {
+ ByteBuffer buf = vec.getBuffer(j);
+ int rem = vec.getRemaining(j);
+ int n = (left > rem) ? rem : (int)left;
+ if (shadow == null) {
+ int pos = vec.getPosition(j);
+ buf.position(pos + n);
+ } else {
+ shadow.limit(shadow.position() + n);
+ buf.put(shadow);
+ }
+ left -= n;
+ }
+ if (shadow != null)
+ Util.offerLastTemporaryDirectBuffer(shadow);
+ vec.clearRefs(j);
+ }
- IOVecWrapper vec = null;
- long bytesRead = 0;
- try {
- // Create a native iovec array
- vec = new IOVecWrapper(numBufs);
-
- // Fill in the iovec array with appropriate data
- for (int i=0; i<numBufs; i++) {
- ByteBuffer nextBuffer = shadow[i];
- // put in the buffer addresses
- long pos = nextBuffer.position();
- long len = nextBuffer.remaining();
- vec.putBase(i, ((DirectBuffer)nextBuffer).address() + pos);
- vec.putLen(i, len);
- }
-
- // Invoke native call to fill the buffers
- bytesRead = nd.readv(fd, vec.address, numBufs);
- } finally {
- vec.free();
- }
- long returnVal = bytesRead;
+ completed = true;
+ return bytesRead;
- // Notify the buffers how many bytes were read
- for (int i=0; i<numBufs; i++) {
- ByteBuffer nextBuffer = shadow[i];
- // Note: should this have been cached from above?
- int pos = nextBuffer.position();
- int len = nextBuffer.remaining();
- if (bytesRead >= len) {
- bytesRead -= len;
- int newPosition = pos + len;
- nextBuffer.position(newPosition);
- } else { // Buffers not completely filled
- if (bytesRead > 0) {
- assert(pos + bytesRead < (long)Integer.MAX_VALUE);
- int newPosition = (int)(pos + bytesRead);
- nextBuffer.position(newPosition);
- }
- break;
- }
- }
-
- // Put results from shadow into the slow buffers
- if (usingSlowBuffers) {
- for (int i=0; i<numBufs; i++) {
- if (!(bufs[i] instanceof DirectBuffer)) {
- shadow[i].flip();
- bufs[i].put(shadow[i]);
- }
- }
- }
- return returnVal;
} finally {
- // return any substituted buffers to cache
- if (usingSlowBuffers) {
- for (int i=0; i<numBufs; i++) {
- ByteBuffer bb = shadow[i];
- if (bb != null && bb != bufs[i]) {
- Util.releaseTemporaryDirectBuffer(bb);
- }
+ // if an error occurred then clear refs to buffers and return any shadow
+ // buffers to cache
+ if (!completed) {
+ for (int j=0; j<iov_len; j++) {
+ ByteBuffer shadow = vec.getShadow(j);
+ if (shadow != null)
+ Util.offerLastTemporaryDirectBuffer(shadow);
+ vec.clearRefs(j);
}
}
}
--- a/jdk/src/share/classes/sun/nio/ch/IOVecWrapper.java Thu Aug 12 16:36:49 2010 -0700
+++ b/jdk/src/share/classes/sun/nio/ch/IOVecWrapper.java Thu Aug 12 16:47:13 2010 -0700
@@ -25,6 +25,7 @@
package sun.nio.ch;
+import java.nio.ByteBuffer;
import sun.misc.*;
@@ -43,23 +44,98 @@
class IOVecWrapper {
// Miscellaneous constants
- static int BASE_OFFSET = 0;
- static int LEN_OFFSET;
- static int SIZE_IOVEC;
+ private static final int BASE_OFFSET = 0;
+ private static final int LEN_OFFSET;
+ private static final int SIZE_IOVEC;
// The iovec array
- private AllocatedNativeObject vecArray;
+ private final AllocatedNativeObject vecArray;
+
+ // Number of elements in iovec array
+ private final int size;
+
+ // Buffers and position/remaining corresponding to elements in iovec array
+ private final ByteBuffer[] buf;
+ private final int[] position;
+ private final int[] remaining;
+
+ // Shadow buffers for cases when original buffer is substituted
+ private final ByteBuffer[] shadow;
// Base address of this array
- long address;
+ final long address;
// Address size in bytes
static int addressSize;
- IOVecWrapper(int newSize) {
- newSize = (newSize + 1) * SIZE_IOVEC;
- vecArray = new AllocatedNativeObject(newSize, false);
- address = vecArray.address();
+ private static class Deallocator implements Runnable {
+ private final AllocatedNativeObject obj;
+ Deallocator(AllocatedNativeObject obj) {
+ this.obj = obj;
+ }
+ public void run() {
+ obj.free();
+ }
+ }
+
+ // per thread IOVecWrapper
+ private static final ThreadLocal<IOVecWrapper> cached =
+ new ThreadLocal<IOVecWrapper>();
+
+ private IOVecWrapper(int size) {
+ this.size = size;
+ this.buf = new ByteBuffer[size];
+ this.position = new int[size];
+ this.remaining = new int[size];
+ this.shadow = new ByteBuffer[size];
+ this.vecArray = new AllocatedNativeObject(size * SIZE_IOVEC, false);
+ this.address = vecArray.address();
+ }
+
+ static IOVecWrapper get(int size) {
+ IOVecWrapper wrapper = cached.get();
+ if (wrapper != null && wrapper.size < size) {
+ // not big enough; eagerly release memory
+ wrapper.vecArray.free();
+ wrapper = null;
+ }
+ if (wrapper == null) {
+ wrapper = new IOVecWrapper(size);
+ Cleaner.create(wrapper, new Deallocator(wrapper.vecArray));
+ cached.set(wrapper);
+ }
+ return wrapper;
+ }
+
+ void setBuffer(int i, ByteBuffer buf, int pos, int rem) {
+ this.buf[i] = buf;
+ this.position[i] = pos;
+ this.remaining[i] = rem;
+ }
+
+ void setShadow(int i, ByteBuffer buf) {
+ shadow[i] = buf;
+ }
+
+ ByteBuffer getBuffer(int i) {
+ return buf[i];
+ }
+
+ int getPosition(int i) {
+ return position[i];
+ }
+
+ int getRemaining(int i) {
+ return remaining[i];
+ }
+
+ ByteBuffer getShadow(int i) {
+ return shadow[i];
+ }
+
+ void clearRefs(int i) {
+ buf[i] = null;
+ shadow[i] = null;
}
void putBase(int i, long base) {
@@ -78,10 +154,6 @@
vecArray.putLong(offset, len);
}
- void free() {
- vecArray.free();
- }
-
static {
addressSize = Util.unsafe().addressSize();
LEN_OFFSET = addressSize;
--- a/jdk/src/share/classes/sun/nio/ch/SocketChannelImpl.java Thu Aug 12 16:36:49 2010 -0700
+++ b/jdk/src/share/classes/sun/nio/ch/SocketChannelImpl.java Thu Aug 12 16:47:13 2010 -0700
@@ -385,9 +385,11 @@
}
}
- private long read0(ByteBuffer[] bufs) throws IOException {
- if (bufs == null)
- throw new NullPointerException();
+ public long read(ByteBuffer[] dsts, int offset, int length)
+ throws IOException
+ {
+ if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
+ throw new IndexOutOfBoundsException();
synchronized (readLock) {
if (!ensureReadOpen())
return -1;
@@ -401,7 +403,7 @@
}
for (;;) {
- n = IOUtil.read(fd, bufs, nd);
+ n = IOUtil.read(fd, dsts, offset, length, nd);
if ((n == IOStatus.INTERRUPTED) && isOpen())
continue;
return IOStatus.normalize(n);
@@ -418,15 +420,6 @@
}
}
- public long read(ByteBuffer[] dsts, int offset, int length)
- throws IOException
- {
- if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
- throw new IndexOutOfBoundsException();
- // ## Fix IOUtil.write so that we can avoid this array copy
- return read0(Util.subsequence(dsts, offset, length));
- }
-
public int write(ByteBuffer buf) throws IOException {
if (buf == null)
throw new NullPointerException();
@@ -458,9 +451,11 @@
}
}
- public long write0(ByteBuffer[] bufs) throws IOException {
- if (bufs == null)
- throw new NullPointerException();
+ public long write(ByteBuffer[] srcs, int offset, int length)
+ throws IOException
+ {
+ if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
+ throw new IndexOutOfBoundsException();
synchronized (writeLock) {
ensureWriteOpen();
long n = 0;
@@ -472,7 +467,7 @@
writerThread = NativeThread.current();
}
for (;;) {
- n = IOUtil.write(fd, bufs, nd);
+ n = IOUtil.write(fd, srcs, offset, length, nd);
if ((n == IOStatus.INTERRUPTED) && isOpen())
continue;
return IOStatus.normalize(n);
@@ -489,15 +484,6 @@
}
}
- public long write(ByteBuffer[] srcs, int offset, int length)
- throws IOException
- {
- if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
- throw new IndexOutOfBoundsException();
- // ## Fix IOUtil.write so that we can avoid this array copy
- return write0(Util.subsequence(srcs, offset, length));
- }
-
// package-private
int sendOutOfBandData(byte b) throws IOException {
synchronized (writeLock) {
--- a/jdk/src/share/classes/sun/nio/ch/Util.java Thu Aug 12 16:36:49 2010 -0700
+++ b/jdk/src/share/classes/sun/nio/ch/Util.java Thu Aug 12 16:47:13 2010 -0700
@@ -41,67 +41,180 @@
class Util {
-
// -- Caches --
// The number of temp buffers in our pool
- private static final int TEMP_BUF_POOL_SIZE = 3;
+ private static final int TEMP_BUF_POOL_SIZE = 8;
- // Per-thread soft cache of the last temporary direct buffer
- private static ThreadLocal<SoftReference<ByteBuffer>>[] bufferPool;
+ // Per-thread cache of temporary direct buffers
+ private static ThreadLocal<BufferCache> bufferCache =
+ new ThreadLocal<BufferCache>()
+ {
+ @Override
+ protected BufferCache initialValue() {
+ return new BufferCache();
+ }
+ };
+
+ /**
+ * A simple cache of direct buffers.
+ */
+ private static class BufferCache {
+ // the array of buffers
+ private ByteBuffer[] buffers;
- @SuppressWarnings("unchecked")
- static ThreadLocal<SoftReference<ByteBuffer>>[] createThreadLocalBufferPool() {
- return new ThreadLocal[TEMP_BUF_POOL_SIZE];
- }
+ // the number of buffers in the cache
+ private int count;
+
+ // the index of the first valid buffer (undefined if count == 0)
+ private int start;
+
+ private int next(int i) {
+ return (i + 1) % TEMP_BUF_POOL_SIZE;
+ }
+
+ BufferCache() {
+ buffers = new ByteBuffer[TEMP_BUF_POOL_SIZE];
+ }
+
+ /**
+ * Removes and returns a buffer from the cache of at least the given
+ * size (or null if no suitable buffer is found).
+ */
+ ByteBuffer get(int size) {
+ if (count == 0)
+ return null; // cache is empty
- static {
- bufferPool = createThreadLocalBufferPool();
- for (int i=0; i<TEMP_BUF_POOL_SIZE; i++)
- bufferPool[i] = new ThreadLocal<SoftReference<ByteBuffer>>();
- }
+ ByteBuffer[] buffers = this.buffers;
- static ByteBuffer getTemporaryDirectBuffer(int size) {
- ByteBuffer buf = null;
- // Grab a buffer if available
- for (int i=0; i<TEMP_BUF_POOL_SIZE; i++) {
- SoftReference<ByteBuffer> ref = bufferPool[i].get();
- if ((ref != null) && ((buf = ref.get()) != null) &&
- (buf.capacity() >= size)) {
- buf.rewind();
- buf.limit(size);
- bufferPool[i].set(null);
- return buf;
+ // search for suitable buffer (often the first buffer will do)
+ ByteBuffer buf = buffers[start];
+ if (buf.capacity() < size) {
+ buf = null;
+ int i = start;
+ while ((i = next(i)) != start) {
+ ByteBuffer bb = buffers[i];
+ if (bb == null)
+ break;
+ if (bb.capacity() >= size) {
+ buf = bb;
+ break;
+ }
+ }
+ if (buf == null)
+ return null;
+ // move first element to here to avoid re-packing
+ buffers[i] = buffers[start];
+ }
+
+ // remove first element
+ buffers[start] = null;
+ start = next(start);
+ count--;
+
+ // prepare the buffer and return it
+ buf.rewind();
+ buf.limit(size);
+ return buf;
+ }
+
+ boolean offerFirst(ByteBuffer buf) {
+ if (count >= TEMP_BUF_POOL_SIZE) {
+ return false;
+ } else {
+ start = (start + TEMP_BUF_POOL_SIZE - 1) % TEMP_BUF_POOL_SIZE;
+ buffers[start] = buf;
+ count++;
+ return true;
}
}
- // Make a new one
- return ByteBuffer.allocateDirect(size);
- }
-
- static void releaseTemporaryDirectBuffer(ByteBuffer buf) {
- if (buf == null)
- return;
- // Put it in an empty slot if such exists
- for (int i=0; i<TEMP_BUF_POOL_SIZE; i++) {
- SoftReference<ByteBuffer> ref = bufferPool[i].get();
- if ((ref == null) || (ref.get() == null)) {
- bufferPool[i].set(new SoftReference<ByteBuffer>(buf));
- return;
- }
- }
- // Otherwise replace a smaller one in the cache if such exists
- for (int i=0; i<TEMP_BUF_POOL_SIZE; i++) {
- SoftReference<ByteBuffer> ref = bufferPool[i].get();
- ByteBuffer inCacheBuf = ref.get();
- if ((inCacheBuf == null) || (buf.capacity() > inCacheBuf.capacity())) {
- bufferPool[i].set(new SoftReference<ByteBuffer>(buf));
- return;
+ boolean offerLast(ByteBuffer buf) {
+ if (count >= TEMP_BUF_POOL_SIZE) {
+ return false;
+ } else {
+ int next = (start + count) % TEMP_BUF_POOL_SIZE;
+ buffers[next] = buf;
+ count++;
+ return true;
}
}
- // release memory
- ((DirectBuffer)buf).cleaner().clean();
+ boolean isEmpty() {
+ return count == 0;
+ }
+
+ ByteBuffer removeFirst() {
+ assert count > 0;
+ ByteBuffer buf = buffers[start];
+ buffers[start] = null;
+ start = next(start);
+ count--;
+ return buf;
+ }
+ }
+
+ /**
+ * Returns a temporary buffer of at least the given size
+ */
+ static ByteBuffer getTemporaryDirectBuffer(int size) {
+ BufferCache cache = bufferCache.get();
+ ByteBuffer buf = cache.get(size);
+ if (buf != null) {
+ return buf;
+ } else {
+ // No suitable buffer in the cache so we need to allocate a new
+ // one. To avoid the cache growing then we remove the first
+ // buffer from the cache and free it.
+ if (!cache.isEmpty()) {
+ buf = cache.removeFirst();
+ free(buf);
+ }
+ return ByteBuffer.allocateDirect(size);
+ }
+ }
+
+ /**
+ * Releases a temporary buffer by returning to the cache or freeing it.
+ */
+ static void releaseTemporaryDirectBuffer(ByteBuffer buf) {
+ offerFirstTemporaryDirectBuffer(buf);
+ }
+
+ /**
+ * Releases a temporary buffer by returning to the cache or freeing it. If
+ * returning to the cache then insert it at the start so that it is
+ * likely to be returned by a subsequent call to getTemporaryDirectBuffer.
+ */
+ static void offerFirstTemporaryDirectBuffer(ByteBuffer buf) {
+ assert buf != null;
+ BufferCache cache = bufferCache.get();
+ if (!cache.offerFirst(buf)) {
+ // cache is full
+ free(buf);
+ }
+ }
+
+ /**
+ * Releases a temporary buffer by returning to the cache or freeing it. If
+ * returning to the cache then insert it at the end. This makes it
+ * suitable for scatter/gather operations where the buffers are returned to
+ * cache in same order that they were obtained.
+ */
+ static void offerLastTemporaryDirectBuffer(ByteBuffer buf) {
+ assert buf != null;
+ BufferCache cache = bufferCache.get();
+ if (!cache.offerLast(buf)) {
+ // cache is full
+ free(buf);
+ }
+ }
+
+ /**
+ * Frees the memory for the given direct buffer
+ */
+ private static void free(ByteBuffer buf) {
+ ((DirectBuffer)buf).cleaner().clean();
}
private static class SelectorWrapper {