--- a/jdk/src/share/classes/sun/nio/ch/AsynchronousSocketChannelImpl.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/src/share/classes/sun/nio/ch/AsynchronousSocketChannelImpl.java Sun Aug 23 12:53:45 2009 +0100
@@ -184,28 +184,53 @@
}
/**
+ * Invoked by connect to initiate the connect operation.
+ */
+ abstract <A> Future<Void> implConnect(SocketAddress remote,
+ A attachment,
+ CompletionHandler<Void,? super A> handler);
+
+ @Override
+ public final Future<Void> connect(SocketAddress remote) {
+ return implConnect(remote, null, null);
+ }
+
+ @Override
+ public final <A> void connect(SocketAddress remote,
+ A attachment,
+ CompletionHandler<Void,? super A> handler)
+ {
+ if (handler == null)
+ throw new NullPointerException("'handler' is null");
+ implConnect(remote, attachment, handler);
+ }
+
+ /**
* Invoked by read to initiate the I/O operation.
*/
- abstract <V extends Number,A> Future<V> readImpl(ByteBuffer[] dsts,
- boolean isScatteringRead,
+ abstract <V extends Number,A> Future<V> implRead(boolean isScatteringRead,
+ ByteBuffer dst,
+ ByteBuffer[] dsts,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<V,? super A> handler);
@SuppressWarnings("unchecked")
- private <V extends Number,A> Future<V> read(ByteBuffer[] dsts,
- boolean isScatteringRead,
+ private <V extends Number,A> Future<V> read(boolean isScatteringRead,
+ ByteBuffer dst,
+ ByteBuffer[] dsts,
long timeout,
TimeUnit unit,
- A attachment,
+ A att,
CompletionHandler<V,? super A> handler)
{
if (!isOpen()) {
- CompletedFuture<V,A> result = CompletedFuture
- .withFailure(this, new ClosedChannelException(), attachment);
- Invoker.invoke(handler, result);
- return result;
+ Throwable e = new ClosedChannelException();
+ if (handler == null)
+ return CompletedFuture.withFailure(e);
+ Invoker.invoke(this, handler, att, null, e);
+ return null;
}
if (remoteAddress == null)
@@ -213,13 +238,13 @@
if (timeout < 0L)
throw new IllegalArgumentException("Negative timeout");
- boolean hasSpaceToRead = isScatteringRead || dsts[0].hasRemaining();
+ boolean hasSpaceToRead = isScatteringRead || dst.hasRemaining();
boolean shutdown = false;
// check and update state
synchronized (readLock) {
if (readKilled)
- throw new RuntimeException("Reading not allowed due to timeout or cancellation");
+ throw new IllegalStateException("Reading not allowed due to timeout or cancellation");
if (reading)
throw new ReadPendingException();
if (readShutdown) {
@@ -234,44 +259,53 @@
// immediately complete with -1 if shutdown for read
// immediately complete with 0 if no space remaining
if (shutdown || !hasSpaceToRead) {
- CompletedFuture<V,A> result;
+ Number result;
if (isScatteringRead) {
- Long value = (shutdown) ? Long.valueOf(-1L) : Long.valueOf(0L);
- result = (CompletedFuture<V,A>)CompletedFuture.withResult(this, value, attachment);
+ result = (shutdown) ? Long.valueOf(-1L) : Long.valueOf(0L);
} else {
- int value = (shutdown) ? -1 : 0;
- result = (CompletedFuture<V,A>)CompletedFuture.withResult(this, value, attachment);
+ result = (shutdown) ? -1 : 0;
}
- Invoker.invoke(handler, result);
- return result;
+ if (handler == null)
+ return CompletedFuture.withResult((V)result);
+ Invoker.invoke(this, handler, att, (V)result, null);
+ return null;
}
- return readImpl(dsts, isScatteringRead, timeout, unit, attachment, handler);
+ return implRead(isScatteringRead, dst, dsts, timeout, unit, att, handler);
+ }
+
+ @Override
+ public final Future<Integer> read(ByteBuffer dst) {
+ if (dst.isReadOnly())
+ throw new IllegalArgumentException("Read-only buffer");
+ return read(false, dst, null, 0L, TimeUnit.MILLISECONDS, null, null);
}
@Override
- public final <A> Future<Integer> read(ByteBuffer dst,
- long timeout,
- TimeUnit unit,
- A attachment,
- CompletionHandler<Integer,? super A> handler)
+ public final <A> void read(ByteBuffer dst,
+ long timeout,
+ TimeUnit unit,
+ A attachment,
+ CompletionHandler<Integer,? super A> handler)
{
+ if (handler == null)
+ throw new NullPointerException("'handler' is null");
if (dst.isReadOnly())
throw new IllegalArgumentException("Read-only buffer");
- ByteBuffer[] bufs = new ByteBuffer[1];
- bufs[0] = dst;
- return read(bufs, false, timeout, unit, attachment, handler);
+ read(false, dst, null, timeout, unit, attachment, handler);
}
@Override
- public final <A> Future<Long> read(ByteBuffer[] dsts,
- int offset,
- int length,
- long timeout,
- TimeUnit unit,
- A attachment,
- CompletionHandler<Long,? super A> handler)
+ public final <A> void read(ByteBuffer[] dsts,
+ int offset,
+ int length,
+ long timeout,
+ TimeUnit unit,
+ A attachment,
+ CompletionHandler<Long,? super A> handler)
{
+ if (handler == null)
+ throw new NullPointerException("'handler' is null");
if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
throw new IndexOutOfBoundsException();
ByteBuffer[] bufs = Util.subsequence(dsts, offset, length);
@@ -279,39 +313,41 @@
if (bufs[i].isReadOnly())
throw new IllegalArgumentException("Read-only buffer");
}
- return read(bufs, true, timeout, unit, attachment, handler);
+ read(true, null, bufs, timeout, unit, attachment, handler);
}
/**
* Invoked by write to initiate the I/O operation.
*/
- abstract <V extends Number,A> Future<V> writeImpl(ByteBuffer[] srcs,
- boolean isGatheringWrite,
+ abstract <V extends Number,A> Future<V> implWrite(boolean isGatheringWrite,
+ ByteBuffer src,
+ ByteBuffer[] srcs,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<V,? super A> handler);
@SuppressWarnings("unchecked")
- private <V extends Number,A> Future<V> write(ByteBuffer[] srcs,
- boolean isGatheringWrite,
+ private <V extends Number,A> Future<V> write(boolean isGatheringWrite,
+ ByteBuffer src,
+ ByteBuffer[] srcs,
long timeout,
TimeUnit unit,
- A attachment,
+ A att,
CompletionHandler<V,? super A> handler)
{
- boolean hasDataToWrite = isGatheringWrite || srcs[0].hasRemaining();
+ boolean hasDataToWrite = isGatheringWrite || src.hasRemaining();
boolean closed = false;
if (isOpen()) {
if (remoteAddress == null)
throw new NotYetConnectedException();
- if (timeout < 0L)
+ if (timeout < 0L)
throw new IllegalArgumentException("Negative timeout");
// check and update state
synchronized (writeLock) {
if (writeKilled)
- throw new RuntimeException("Writing not allowed due to timeout or cancellation");
+ throw new IllegalStateException("Writing not allowed due to timeout or cancellation");
if (writing)
throw new WritePendingException();
if (writeShutdown) {
@@ -327,52 +363,57 @@
// channel is closed or shutdown for write
if (closed) {
- CompletedFuture<V,A> result = CompletedFuture
- .withFailure(this, new ClosedChannelException(), attachment);
- Invoker.invoke(handler, result);
- return result;
+ Throwable e = new ClosedChannelException();
+ if (handler == null)
+ return CompletedFuture.withFailure(e);
+ Invoker.invoke(this, handler, att, null, e);
+ return null;
}
// nothing to write so complete immediately
if (!hasDataToWrite) {
- CompletedFuture<V,A> result;
- if (isGatheringWrite) {
- result = (CompletedFuture<V,A>)CompletedFuture.withResult(this, 0L, attachment);
- } else {
- result = (CompletedFuture<V,A>)CompletedFuture.withResult(this, 0, attachment);
- }
- Invoker.invoke(handler, result);
- return result;
+ Number result = (isGatheringWrite) ? (Number)0L : (Number)0;
+ if (handler == null)
+ return CompletedFuture.withResult((V)result);
+ Invoker.invoke(this, handler, att, (V)result, null);
+ return null;
}
- return writeImpl(srcs, isGatheringWrite, timeout, unit, attachment, handler);
+ return implWrite(isGatheringWrite, src, srcs, timeout, unit, att, handler);
+ }
+
+ @Override
+ public final Future<Integer> write(ByteBuffer src) {
+ return write(false, src, null, 0L, TimeUnit.MILLISECONDS, null, null);
}
@Override
- public final <A> Future<Integer> write(ByteBuffer src,
- long timeout,
- TimeUnit unit,
- A attachment,
- CompletionHandler<Integer,? super A> handler)
+ public final <A> void write(ByteBuffer src,
+ long timeout,
+ TimeUnit unit,
+ A attachment,
+ CompletionHandler<Integer,? super A> handler)
{
- ByteBuffer[] bufs = new ByteBuffer[1];
- bufs[0] = src;
- return write(bufs, false, timeout, unit, attachment, handler);
+ if (handler == null)
+ throw new NullPointerException("'handler' is null");
+ write(false, src, null, timeout, unit, attachment, handler);
}
@Override
- public final <A> Future<Long> write(ByteBuffer[] srcs,
- int offset,
- int length,
- long timeout,
- TimeUnit unit,
- A attachment,
- CompletionHandler<Long,? super A> handler)
+ public final <A> void write(ByteBuffer[] srcs,
+ int offset,
+ int length,
+ long timeout,
+ TimeUnit unit,
+ A attachment,
+ CompletionHandler<Long,? super A> handler)
{
+ if (handler == null)
+ throw new NullPointerException("'handler' is null");
if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
throw new IndexOutOfBoundsException();
srcs = Util.subsequence(srcs, offset, length);
- return write(srcs, true, timeout, unit, attachment, handler);
+ write(true, null, srcs, timeout, unit, attachment, handler);
}
@Override
@@ -461,7 +502,6 @@
}
@Override
- @SuppressWarnings("unchecked")
public final SocketAddress getRemoteAddress() throws IOException {
if (!isOpen())
throw new ClosedChannelException();