6842687: New I/O: Update Asynchronous I/O API to jsr203/nio2-b101
Reviewed-by: sherman
--- a/jdk/make/java/nio/FILES_java.gmk Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/make/java/nio/FILES_java.gmk Sun Aug 23 12:53:45 2009 +0100
@@ -160,7 +160,6 @@
\
sun/nio/ByteBuffered.java \
\
- sun/nio/ch/AbstractFuture.java \
sun/nio/ch/AbstractPollArrayWrapper.java \
sun/nio/ch/AllocatedNativeObject.java \
sun/nio/ch/AsynchronousChannelGroupImpl.java \
--- a/jdk/src/share/classes/java/nio/channels/AsynchronousByteChannel.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/src/share/classes/java/nio/channels/AsynchronousByteChannel.java Sun Aug 23 12:53:45 2009 +0100
@@ -56,18 +56,18 @@
/**
* Reads a sequence of bytes from this channel into the given buffer.
*
- * <p> This method initiates an operation to read a sequence of bytes from
- * this channel into the given buffer. The method returns a {@link Future}
- * representing the pending result of the operation. The result of the
- * operation, obtained by invoking the {@code Future} 's {@link
- * Future#get() get} method, is the number of bytes read or {@code -1} if
- * all bytes have been read and the channel has reached end-of-stream.
+ * <p> This method initiates an asynchronous read operation to read a
+ * sequence of bytes from this channel into the given buffer. The {@code
+ * handler} parameter is a completion handler that is invoked when the read
+ * operation completes (or fails). The result passed to the completion
+ * handler is the number of bytes read or {@code -1} if no bytes could be
+ * read because the channel has reached end-of-stream.
*
- * <p> This method initiates a read operation to read up to <i>r</i> bytes
- * from the channel, where <i>r</i> is the number of bytes remaining in the
- * buffer, that is, {@code dst.remaining()} at the time that the read is
- * attempted. Where <i>r</i> is 0, the read operation completes immediately
- * with a result of {@code 0} without initiating an I/O operation.
+ * <p> The read operation may read up to <i>r</i> bytes from the channel,
+ * where <i>r</i> is the number of bytes remaining in the buffer, that is,
+ * {@code dst.remaining()} at the time that the read is attempted. Where
+ * <i>r</i> is 0, the read operation completes immediately with a result of
+ * {@code 0} without initiating an I/O operation.
*
* <p> Suppose that a byte sequence of length <i>n</i> is read, where
* <tt>0</tt> <tt><</tt> <i>n</i> <tt><=</tt> <i>r</i>.
@@ -79,44 +79,46 @@
* <i>p</i> <tt>+</tt> <i>n</i>; its limit will not have changed.
*
* <p> Buffers are not safe for use by multiple concurrent threads so care
- * should be taken to not to access the buffer until the operaton has completed.
+ * should be taken to not access the buffer until the operation has
+ * completed.
*
* <p> This method may be invoked at any time. Some channel types may not
* allow more than one read to be outstanding at any given time. If a thread
* initiates a read operation before a previous read operation has
* completed then a {@link ReadPendingException} will be thrown.
*
- * <p> The <tt>handler</tt> parameter is used to specify a {@link
- * CompletionHandler}. When the read operation completes the handler's
- * {@link CompletionHandler#completed completed} method is executed.
- *
- *
* @param dst
* The buffer into which bytes are to be transferred
* @param attachment
* The object to attach to the I/O operation; can be {@code null}
* @param handler
- * The completion handler object; can be {@code null}
- *
- * @return A Future representing the result of the operation
+ * The completion handler
*
* @throws IllegalArgumentException
* If the buffer is read-only
* @throws ReadPendingException
* If the channel does not allow more than one read to be outstanding
* and a previous read has not completed
+ * @throws ShutdownChannelGroupException
+ * If the channel is associated with a {@link AsynchronousChannelGroup
+ * group} that has terminated
*/
- <A> Future<Integer> read(ByteBuffer dst,
- A attachment,
- CompletionHandler<Integer,? super A> handler);
+ <A> void read(ByteBuffer dst,
+ A attachment,
+ CompletionHandler<Integer,? super A> handler);
/**
* Reads a sequence of bytes from this channel into the given buffer.
*
- * <p> An invocation of this method of the form <tt>c.read(dst)</tt>
- * behaves in exactly the same manner as the invocation
- * <blockquote><pre>
- * c.read(dst, null, null);</pre></blockquote>
+ * <p> This method initiates an asynchronous read operation to read a
+ * sequence of bytes from this channel into the given buffer. The method
+ * behaves in exactly the same manner as the {@link
+ * #read(ByteBuffer,Object,CompletionHandler)
+ * read(ByteBuffer,Object,CompletionHandler)} method except that instead
+ * of specifying a completion handler, this method returns a {@code Future}
+ * representing the pending result. The {@code Future}'s {@link Future#get()
+ * get} method returns the number of bytes read or {@code -1} if no bytes
+ * could be read because the channel has reached end-of-stream.
*
* @param dst
* The buffer into which bytes are to be transferred
@@ -134,17 +136,17 @@
/**
* Writes a sequence of bytes to this channel from the given buffer.
*
- * <p> This method initiates an operation to write a sequence of bytes to
- * this channel from the given buffer. This method returns a {@link
- * Future} representing the pending result of the operation. The result
- * of the operation, obtained by invoking the <tt>Future</tt>'s {@link
- * Future#get() get} method, is the number of bytes written, possibly zero.
+ * <p> This method initiates an asynchronous write operation to write a
+ * sequence of bytes to this channel from the given buffer. The {@code
+ * handler} parameter is a completion handler that is invoked when the write
+ * operation completes (or fails). The result passed to the completion
+ * handler is the number of bytes written.
*
- * <p> This method initiates a write operation to write up to <i>r</i> bytes
- * to the channel, where <i>r</i> is the number of bytes remaining in the
- * buffer, that is, {@code src.remaining()} at the moment the write is
- * attempted. Where <i>r</i> is 0, the write operation completes immediately
- * with a result of {@code 0} without initiating an I/O operation.
+ * <p> The write operation may write up to <i>r</i> bytes to the channel,
+ * where <i>r</i> is the number of bytes remaining in the buffer, that is,
+ * {@code src.remaining()} at the time that the write is attempted. Where
+ * <i>r</i> is 0, the write operation completes immediately with a result of
+ * {@code 0} without initiating an I/O operation.
*
* <p> Suppose that a byte sequence of length <i>n</i> is written, where
* <tt>0</tt> <tt><</tt> <i>n</i> <tt><=</tt> <i>r</i>.
@@ -156,41 +158,43 @@
* <i>p</i> <tt>+</tt> <i>n</i>; its limit will not have changed.
*
* <p> Buffers are not safe for use by multiple concurrent threads so care
- * should be taken to not to access the buffer until the operaton has completed.
+ * should be taken to not access the buffer until the operation has
+ * completed.
*
* <p> This method may be invoked at any time. Some channel types may not
* allow more than one write to be outstanding at any given time. If a thread
* initiates a write operation before a previous write operation has
* completed then a {@link WritePendingException} will be thrown.
*
- * <p> The <tt>handler</tt> parameter is used to specify a {@link
- * CompletionHandler}. When the write operation completes the handler's
- * {@link CompletionHandler#completed completed} method is executed.
- *
* @param src
* The buffer from which bytes are to be retrieved
* @param attachment
* The object to attach to the I/O operation; can be {@code null}
* @param handler
- * The completion handler object; can be {@code null}
- *
- * @return A Future representing the result of the operation
+ * The completion handler object
*
* @throws WritePendingException
* If the channel does not allow more than one write to be outstanding
* and a previous write has not completed
+ * @throws ShutdownChannelGroupException
+ * If the channel is associated with a {@link AsynchronousChannelGroup
+ * group} that has terminated
*/
- <A> Future<Integer> write(ByteBuffer src,
- A attachment,
- CompletionHandler<Integer,? super A> handler);
+ <A> void write(ByteBuffer src,
+ A attachment,
+ CompletionHandler<Integer,? super A> handler);
/**
* Writes a sequence of bytes to this channel from the given buffer.
*
- * <p> An invocation of this method of the form <tt>c.write(src)</tt>
- * behaves in exactly the same manner as the invocation
- * <blockquote><pre>
- * c.write(src, null, null);</pre></blockquote>
+ * <p> This method initiates an asynchronous write operation to write a
+ * sequence of bytes to this channel from the given buffer. The method
+ * behaves in exactly the same manner as the {@link
+ * #write(ByteBuffer,Object,CompletionHandler)
+ * write(ByteBuffer,Object,CompletionHandler)} method except that instead
+ * of specifying a completion handler, this method returns a {@code Future}
+ * representing the pending result. The {@code Future}'s {@link Future#get()
+ * get} method returns the number of bytes written.
*
* @param src
* The buffer from which bytes are to be retrieved
--- a/jdk/src/share/classes/java/nio/channels/AsynchronousChannel.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/src/share/classes/java/nio/channels/AsynchronousChannel.java Sun Aug 23 12:53:45 2009 +0100
@@ -34,7 +34,8 @@
*
* <ol>
* <li><pre>{@link Future}<V> <em>operation</em>(<em>...</em>)</pre></li>
- * <li><pre>Future<V> <em>operation</em>(<em>...</em> A attachment, {@link CompletionHandler}<V,? super A> handler)</pre></li>
+ * <li><pre>void <em>operation</em>(<em>...</em> A attachment, {@link
+ * CompletionHandler}<V,? super A> handler)</pre></li>
* </ol>
*
* where <i>operation</i> is the name of the I/O operation (read or write for
@@ -48,7 +49,7 @@
* interface may be used to check if the operation has completed, wait for its
* completion, and to retrieve the result. In the second form, a {@link
* CompletionHandler} is invoked to consume the result of the I/O operation when
- * it completes, fails, or is cancelled.
+ * it completes or fails.
*
* <p> A channel that implements this interface is <em>asynchronously
* closeable</em>: If an I/O operation is outstanding on the channel and the
@@ -63,33 +64,33 @@
* <h4>Cancellation</h4>
*
* <p> The {@code Future} interface defines the {@link Future#cancel cancel}
- * method to cancel execution of a task.
- *
- * <p> Where the {@code cancel} method is invoked with the {@code
- * mayInterruptIfRunning} parameter set to {@code true} then the I/O operation
- * may be interrupted by closing the channel. This will cause any other I/O
- * operations outstanding on the channel to complete with the exception {@link
- * AsynchronousCloseException}.
+ * method to cancel execution. This causes all threads waiting on the result of
+ * the I/O operation to throw {@link java.util.concurrent.CancellationException}.
+ * Whether the underlying I/O operation can be cancelled is highly implementation
+ * specific and therefore not specified. Where cancellation leaves the channel,
+ * or the entity to which it is connected, in an inconsistent state, then the
+ * channel is put into an implementation specific <em>error state</em> that
+ * prevents further attempts to initiate I/O operations that are <i>similar</i>
+ * to the operation that was cancelled. For example, if a read operation is
+ * cancelled but the implementation cannot guarantee that bytes have not been
+ * read from the channel then it puts the channel into an error state; further
+ * attempts to initiate a {@code read} operation cause an unspecified runtime
+ * exception to be thrown. Similarly, if a write operation is cancelled but the
+ * implementation cannot guarantee that bytes have not been written to the
+ * channel then subsequent attempts to initiate a {@code write} will fail with
+ * an unspecified runtime exception.
*
- * <p> If a {@code CompletionHandler} is specified when initiating an I/O
- * operation, and the {@code cancel} method is invoked to cancel the I/O
- * operation before it completes, then the {@code CompletionHandler}'s {@link
- * CompletionHandler#cancelled cancelled} method is invoked.
- *
- * <p> If an implementation of this interface supports a means to cancel I/O
- * operations, and where cancellation may leave the channel, or the entity to
- * which it is connected, in an inconsistent state, then the channel is put into
- * an implementation specific <em>error state</em> that prevents further
- * attempts to initiate I/O operations on the channel. For example, if a read
- * operation is cancelled but the implementation cannot guarantee that bytes
- * have not been read from the channel then it puts the channel into error state
- * state; further attempts to initiate a {@code read} operation causes an
- * unspecified runtime exception to be thrown.
+ * <p> Where the {@link Future#cancel cancel} method is invoked with the {@code
+ * mayInterruptIfRunning} parameter set to {@code true} then the I/O operation
+ * may be interrupted by closing the channel. In that case all threads waiting
+ * on the result of the I/O operation throw {@code CancellationException} and
+ * any other I/O operations outstanding on the channel complete with the
+ * exception {@link AsynchronousCloseException}.
*
* <p> Where the {@code cancel} method is invoked to cancel read or write
- * operations then it recommended that all buffers used in the I/O operations be
- * discarded or care taken to ensure that the buffers are not accessed while the
- * channel remains open.
+ * operations then it is recommended that all buffers used in the I/O operations
+ * be discarded or care taken to ensure that the buffers are not accessed while
+ * the channel remains open.
*
* @since 1.7
*/
@@ -102,7 +103,7 @@
*
* <p> Any outstanding asynchronous operations upon this channel will
* complete with the exception {@link AsynchronousCloseException}. After a
- * channel is closed then further attempts to initiate asynchronous I/O
+ * channel is closed, further attempts to initiate asynchronous I/O
* operations complete immediately with cause {@link ClosedChannelException}.
*
* <p> This method otherwise behaves exactly as specified by the {@link
--- a/jdk/src/share/classes/java/nio/channels/AsynchronousDatagramChannel.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/src/share/classes/java/nio/channels/AsynchronousDatagramChannel.java Sun Aug 23 12:53:45 2009 +0100
@@ -109,19 +109,13 @@
* // print the source address of all packets that we receive
* dc.receive(buffer, buffer, new CompletionHandler<SocketAddress,ByteBuffer>() {
* public void completed(SocketAddress sa, ByteBuffer buffer) {
- * try {
- * System.out.println(sa);
- *
- * buffer.clear();
- * dc.receive(buffer, buffer, this);
- * } catch (...) { ... }
+ * System.out.println(sa);
+ * buffer.clear();
+ * dc.receive(buffer, buffer, this);
* }
* public void failed(Throwable exc, ByteBuffer buffer) {
* ...
* }
- * public void cancelled(ByteBuffer buffer) {
- * ...
- * }
* });
* </pre>
*
@@ -314,10 +308,10 @@
/**
* Receives a datagram via this channel.
*
- * <p> This method initiates the receiving of a datagram, returning a
- * {@code Future} representing the pending result of the operation.
- * The {@code Future}'s {@link Future#get() get} method returns
- * the source address of the datagram upon successful completion.
+ * <p> This method initiates the receiving of a datagram into the given
+ * buffer. The {@code handler} parameter is a completion handler that is
+ * invoked when the receive operation completes (or fails). The result
+ * passed to the completion handler is the datagram's source address.
*
* <p> The datagram is transferred into the given byte buffer starting at
* its current position, as if by a regular {@link AsynchronousByteChannel#read
@@ -350,28 +344,26 @@
* @param attachment
* The object to attach to the I/O operation; can be {@code null}
* @param handler
- * The handler for consuming the result; can be {@code null}
- *
- * @return a {@code Future} object representing the pending result
+ * The handler for consuming the result
*
* @throws IllegalArgumentException
* If the timeout is negative or the buffer is read-only
* @throws ShutdownChannelGroupException
- * If a handler is specified, and the channel group is shutdown
+ * If the channel group has terminated
*/
- public abstract <A> Future<SocketAddress> receive(ByteBuffer dst,
- long timeout,
- TimeUnit unit,
- A attachment,
- CompletionHandler<SocketAddress,? super A> handler);
+ public abstract <A> void receive(ByteBuffer dst,
+ long timeout,
+ TimeUnit unit,
+ A attachment,
+ CompletionHandler<SocketAddress,? super A> handler);
/**
* Receives a datagram via this channel.
*
- * <p> This method initiates the receiving of a datagram, returning a
- * {@code Future} representing the pending result of the operation.
- * The {@code Future}'s {@link Future#get() get} method returns
- * the source address of the datagram upon successful completion.
+ * <p> This method initiates the receiving of a datagram into the given
+ * buffer. The {@code handler} parameter is a completion handler that is
+ * invoked when the receive operation completes (or fails). The result
+ * passed to the completion handler is the datagram's source address.
*
* <p> This method is equivalent to invoking {@link
* #receive(ByteBuffer,long,TimeUnit,Object,CompletionHandler)} with a
@@ -382,34 +374,30 @@
* @param attachment
* The object to attach to the I/O operation; can be {@code null}
* @param handler
- * The handler for consuming the result; can be {@code null}
- *
- * @return a {@code Future} object representing the pending result
+ * The handler for consuming the result
*
* @throws IllegalArgumentException
* If the buffer is read-only
* @throws ShutdownChannelGroupException
- * If a handler is specified, and the channel group is shutdown
+ * If the channel group has terminated
*/
- public final <A> Future<SocketAddress> receive(ByteBuffer dst,
- A attachment,
- CompletionHandler<SocketAddress,? super A> handler)
+ public final <A> void receive(ByteBuffer dst,
+ A attachment,
+ CompletionHandler<SocketAddress,? super A> handler)
{
- return receive(dst, 0L, TimeUnit.MILLISECONDS, attachment, handler);
+ receive(dst, 0L, TimeUnit.MILLISECONDS, attachment, handler);
}
/**
* Receives a datagram via this channel.
*
- * <p> This method initiates the receiving of a datagram, returning a
- * {@code Future} representing the pending result of the operation.
- * The {@code Future}'s {@link Future#get() get} method returns
- * the source address of the datagram upon successful completion.
- *
- * <p> This method is equivalent to invoking {@link
- * #receive(ByteBuffer,long,TimeUnit,Object,CompletionHandler)} with a
- * timeout of {@code 0L}, and an attachment and completion handler
- * of {@code null}.
+ * <p> This method initiates the receiving of a datagram into the given
+ * buffer. The method behaves in exactly the same manner as the {@link
+ * #receive(ByteBuffer,Object,CompletionHandler)
+ * receive(ByteBuffer,Object,CompletionHandler)} method except that instead
+ * of specifying a completion handler, this method returns a {@code Future}
+ * representing the pending result. The {@code Future}'s {@link Future#get()
+ * get} method returns the datagram's source address.
*
* @param dst
* The buffer into which the datagram is to be transferred
@@ -419,84 +407,19 @@
* @throws IllegalArgumentException
* If the buffer is read-only
*/
- public final <A> Future<SocketAddress> receive(ByteBuffer dst) {
- return receive(dst, 0L, TimeUnit.MILLISECONDS, null, null);
- }
+ public abstract Future<SocketAddress> receive(ByteBuffer dst);
/**
* Sends a datagram via this channel.
*
- * <p> This method initiates sending of a datagram, returning a
- * {@code Future} representing the pending result of the operation.
- * The operation sends the remaining bytes in the given buffer as a single
- * datagram to the given target address. The result of the operation, obtained
- * by invoking the {@code Future}'s {@link Future#get() get}
- * method, is the number of bytes sent.
- *
- * <p> The datagram is transferred from the byte buffer as if by a regular
- * {@link AsynchronousByteChannel#write write} operation.
- *
- * <p> If a timeout is specified and the timeout elapses before the operation
- * completes then the operation completes with the exception {@link
- * InterruptedByTimeoutException}. When a timeout elapses then the state of
- * the {@link ByteBuffer} is not defined. The buffers should be discarded or
- * at least care must be taken to ensure that the buffer is not accessed
- * while the channel remains open.
- *
- * <p> If there is a security manager installed and the channel is not
- * connected then this method verifies that the target address and port number
- * are permitted by the security manager's {@link SecurityManager#checkConnect
- * checkConnect} method. The overhead of this security check can be avoided
- * by first connecting the socket via the {@link #connect connect} method.
- *
- * @param src
- * The buffer containing the datagram to be sent
- * @param target
- * The address to which the datagram is to be sent
- * @param timeout
- * The timeout, or {@code 0L} for no timeout
- * @param unit
- * The time unit of the {@code timeout} argument
- * @param attachment
- * The object to attach to the I/O operation; can be {@code null}
- * @param handler
- * The handler for consuming the result; can be {@code null}
+ * <p> This method initiates sending of a datagram from the given buffer to
+ * the given address. The {@code handler} parameter is a completion handler
+ * that is invoked when the send completes (or fails). The result passed to
+ * the completion handler is the number of bytes sent.
*
- * @return a {@code Future} object representing the pending result
- *
- * @throws UnresolvedAddressException
- * If the given remote address is not fully resolved
- * @throws UnsupportedAddressTypeException
- * If the type of the given remote address is not supported
- * @throws IllegalArgumentException
- * If the timeout is negative, or if the channel's socket is
- * connected to an address that is not equal to {@code target}
- * @throws SecurityException
- * If a security manager has been installed and it does not permit
- * datagrams to be sent to the given address
- * @throws ShutdownChannelGroupException
- * If a handler is specified, and the channel group is shutdown
- */
- public abstract <A> Future<Integer> send(ByteBuffer src,
- SocketAddress target,
- long timeout,
- TimeUnit unit,
- A attachment,
- CompletionHandler<Integer,? super A> handler);
-
- /**
- * Sends a datagram via this channel.
- *
- * <p> This method initiates sending of a datagram, returning a
- * {@code Future} representing the pending result of the operation.
- * The operation sends the remaining bytes in the given buffer as a single
- * datagram to the given target address. The result of the operation, obtained
- * by invoking the {@code Future}'s {@link Future#get() get}
- * method, is the number of bytes sent.
- *
- * <p> This method is equivalent to invoking {@link
- * #send(ByteBuffer,SocketAddress,long,TimeUnit,Object,CompletionHandler)}
- * with a timeout of {@code 0L}.
+ * <p> Otherwise this method works in the same manner as the {@link
+ * AsynchronousByteChannel#write(ByteBuffer,Object,CompletionHandler)}
+ * method.
*
* @param src
* The buffer containing the datagram to be sent
@@ -505,9 +428,7 @@
* @param attachment
* The object to attach to the I/O operation; can be {@code null}
* @param handler
- * The handler for consuming the result; can be {@code null}
- *
- * @return a {@code Future} object representing the pending result
+ * The handler for consuming the result
*
* @throws UnresolvedAddressException
* If the given remote address is not fully resolved
@@ -520,30 +441,23 @@
* If a security manager has been installed and it does not permit
* datagrams to be sent to the given address
* @throws ShutdownChannelGroupException
- * If a handler is specified, and the channel group is shutdown
+ * If the channel group has terminated
*/
- public final <A> Future<Integer> send(ByteBuffer src,
- SocketAddress target,
- A attachment,
- CompletionHandler<Integer,? super A> handler)
- {
- return send(src, target, 0L, TimeUnit.MILLISECONDS, attachment, handler);
- }
+ public abstract <A> void send(ByteBuffer src,
+ SocketAddress target,
+ A attachment,
+ CompletionHandler<Integer,? super A> handler);
/**
* Sends a datagram via this channel.
*
- * <p> This method initiates sending of a datagram, returning a
- * {@code Future} representing the pending result of the operation.
- * The operation sends the remaining bytes in the given buffer as a single
- * datagram to the given target address. The result of the operation, obtained
- * by invoking the {@code Future}'s {@link Future#get() get}
- * method, is the number of bytes sent.
- *
- * <p> This method is equivalent to invoking {@link
- * #send(ByteBuffer,SocketAddress,long,TimeUnit,Object,CompletionHandler)}
- * with a timeout of {@code 0L} and an attachment and completion handler
- * of {@code null}.
+ * <p> This method initiates sending of a datagram from the given buffer to
+ * the given address. The method behaves in exactly the same manner as the
+ * {@link #send(ByteBuffer,SocketAddress,Object,CompletionHandler)
+ * send(ByteBuffer,SocketAddress,Object,CompletionHandler)} method except
+ * that instead of specifying a completion handler, this method returns a
+ * {@code Future} representing the pending result. The {@code Future}'s
+ * {@link Future#get() get} method returns the number of bytes sent.
*
* @param src
* The buffer containing the datagram to be sent
@@ -563,17 +477,15 @@
* If a security manager has been installed and it does not permit
* datagrams to be sent to the given address
*/
- public final Future<Integer> send(ByteBuffer src, SocketAddress target) {
- return send(src, target, 0L, TimeUnit.MILLISECONDS, null, null);
- }
+ public abstract Future<Integer> send(ByteBuffer src, SocketAddress target);
/**
* Receives a datagram via this channel.
*
- * <p> This method initiates the receiving of a datagram, returning a
- * {@code Future} representing the pending result of the operation.
- * The {@code Future}'s {@link Future#get() get} method returns
- * the number of bytes transferred upon successful completion.
+ * <p> This method initiates the receiving of a datagram into the given
+ * buffer. The {@code handler} parameter is a completion handler that is
+ * invoked when the receive operation completes (or fails). The result
+ * passed to the completion handler is number of bytes read.
*
* <p> This method may only be invoked if this channel is connected, and it
* only accepts datagrams from the peer that the channel is connected too.
@@ -599,120 +511,62 @@
* @param attachment
* The object to attach to the I/O operation; can be {@code null}
* @param handler
- * The handler for consuming the result; can be {@code null}
- *
- * @return a {@code Future} object representing the pending result
+ * The handler for consuming the result
*
* @throws IllegalArgumentException
* If the timeout is negative or buffer is read-only
* @throws NotYetConnectedException
* If this channel is not connected
* @throws ShutdownChannelGroupException
- * If a handler is specified, and the channel group is shutdown
+ * If the channel group has terminated
*/
- public abstract <A> Future<Integer> read(ByteBuffer dst,
- long timeout,
- TimeUnit unit,
- A attachment,
- CompletionHandler<Integer,? super A> handler);
+ public abstract <A> void read(ByteBuffer dst,
+ long timeout,
+ TimeUnit unit,
+ A attachment,
+ CompletionHandler<Integer,? super A> handler);
/**
* @throws NotYetConnectedException
* If this channel is not connected
* @throws ShutdownChannelGroupException
- * If a handler is specified, and the channel group is shutdown
+ * If the channel group has terminated
*/
@Override
- public final <A> Future<Integer> read(ByteBuffer dst,
- A attachment,
- CompletionHandler<Integer,? super A> handler)
+ public final <A> void read(ByteBuffer dst,
+ A attachment,
+ CompletionHandler<Integer,? super A> handler)
{
- return read(dst, 0L, TimeUnit.MILLISECONDS, attachment, handler);
+ read(dst, 0L, TimeUnit.MILLISECONDS, attachment, handler);
}
/**
* @throws NotYetConnectedException
* If this channel is not connected
* @throws ShutdownChannelGroupException
- * If a handler is specified, and the channel group is shutdown
+ * If the channel group has terminated
*/
@Override
- public final Future<Integer> read(ByteBuffer dst) {
- return read(dst, 0L, TimeUnit.MILLISECONDS, null, null);
- }
-
- /**
- * Writes a datagram to this channel.
- *
- * <p> This method initiates sending of a datagram, returning a
- * {@code Future} representing the pending result of the operation.
- * The operation sends the remaining bytes in the given buffer as a single
- * datagram. The result of the operation, obtained by invoking the
- * {@code Future}'s {@link Future#get() get} method, is the
- * number of bytes sent.
- *
- * <p> The datagram is transferred from the byte buffer as if by a regular
- * {@link AsynchronousByteChannel#write write} operation.
- *
- * <p> This method may only be invoked if this channel is connected,
- * in which case it sends datagrams directly to the socket's peer. Otherwise
- * it behaves exactly as specified in the {@link
- * AsynchronousByteChannel} interface.
- *
- * <p> If a timeout is specified and the timeout elapses before the operation
- * completes then the operation completes with the exception {@link
- * InterruptedByTimeoutException}. When a timeout elapses then the state of
- * the {@link ByteBuffer} is not defined. The buffers should be discarded or
- * at least care must be taken to ensure that the buffer is not accessed
- * while the channel remains open.
- *
- * @param src
- * The buffer containing the datagram to be sent
- * @param timeout
- * The timeout, or {@code 0L} for no timeout
- * @param unit
- * The time unit of the {@code timeout} argument
- * @param attachment
- * The object to attach to the I/O operation; can be {@code null}
- * @param handler
- * The handler for consuming the result; can be {@code null}
- *
- * @return a {@code Future} object representing the pending result
- *
- * @throws IllegalArgumentException
- * If the timeout is negative
- * @throws NotYetConnectedException
- * If this channel is not connected
- * @throws ShutdownChannelGroupException
- * If a handler is specified, and the channel group is shutdown
- */
- public abstract <A> Future<Integer> write(ByteBuffer src,
- long timeout,
- TimeUnit unit,
- A attachment,
- CompletionHandler<Integer,? super A> handler);
- /**
- * @throws NotYetConnectedException
- * If this channel is not connected
- * @throws ShutdownChannelGroupException
- * If a handler is specified, and the channel group is shutdown
- */
- @Override
- public final <A> Future<Integer> write(ByteBuffer src,
- A attachment,
- CompletionHandler<Integer,? super A> handler)
- {
- return write(src, 0L, TimeUnit.MILLISECONDS, attachment, handler);
- }
+ public abstract Future<Integer> read(ByteBuffer dst);
/**
* @throws NotYetConnectedException
* If this channel is not connected
* @throws ShutdownChannelGroupException
- * If a handler is specified, and the channel group is shutdown
+ * If the channel group has terminated
*/
@Override
- public final Future<Integer> write(ByteBuffer src) {
- return write(src, 0L, TimeUnit.MILLISECONDS, null, null);
- }
+ public abstract <A> void write(ByteBuffer src,
+ A attachment,
+ CompletionHandler<Integer,? super A> handler);
+
+
+ /**
+ * @throws NotYetConnectedException
+ * If this channel is not connected
+ * @throws ShutdownChannelGroupException
+ * If the channel group has terminated
+ */
+ @Override
+ public abstract Future<Integer> write(ByteBuffer src);
}
--- a/jdk/src/share/classes/java/nio/channels/AsynchronousFileChannel.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/src/share/classes/java/nio/channels/AsynchronousFileChannel.java Sun Aug 23 12:53:45 2009 +0100
@@ -48,7 +48,12 @@
*
* <p> An asynchronous file channel does not have a <i>current position</i>
* within the file. Instead, the file position is specified to each read and
- * write operation.
+ * write methd that initiate asynchronous operations. A {@link CompletionHandler}
+ * is specified as a parameter and is invoked to consume the result of the I/O
+ * operation. This class also defines read and write methods that initiate
+ * asynchronous operations, returning a {@link Future} to represent the pending
+ * result of the operation. The {@code Future} may be used to check if the
+ * operation has completed, to wait for its completion.
*
* <p> In addition to read and write operations, this class defines the
* following operations: </p>
@@ -59,18 +64,11 @@
* out</i>} to the underlying storage device, ensuring that data are not
* lost in the event of a system crash. </p></li>
*
- * <li><p> A region of a file may be {@link FileLock <i>locked</i>}
- * against access by other programs. </p></li>
+ * <li><p> A region of a file may be {@link #lock <i>locked</i>} against
+ * access by other programs. </p></li>
*
* </ul>
*
- * <p> The {@link #read read}, {@link #write write}, and {@link #lock lock}
- * methods defined by this class are asynchronous and return a {@link Future}
- * to represent the pending result of the operation. This may be used to check
- * if the operation has completed, to wait for its completion, and to retrieve
- * the result. These method may optionally specify a {@link CompletionHandler}
- * that is invoked to consume the result of the I/O operation when it completes.
- *
* <p> An {@code AsynchronousFileChannel} is associated with a thread pool to
* which tasks are submitted to handle I/O events and dispatch to completion
* handlers that consume the results of I/O operations on the channel. The
@@ -123,22 +121,6 @@
}
/**
- * Closes this channel.
- *
- * <p> If this channel is associated with its own thread pool then closing
- * the channel causes the thread pool to shutdown after all actively
- * executing completion handlers have completed. No attempt is made to stop
- * or interrupt actively completion handlers.
- *
- * <p> This method otherwise behaves exactly as specified by the {@link
- * AsynchronousChannel} interface.
- *
- * @throws IOException {@inheritDoc}
- */
- @Override
- public abstract void close() throws IOException;
-
- /**
* Opens or creates a file for reading and/or writing, returning an
* asynchronous file channel to access the file.
*
@@ -215,9 +197,8 @@
* should be taken when configuring the {@code Executor}. Minimally it
* should support an unbounded work queue and should not run tasks on the
* caller thread of the {@link ExecutorService#execute execute} method.
- * {@link #close Closing} the channel results in the orderly {@link
- * ExecutorService#shutdown shutdown} of the executor service. Shutting down
- * the executor service by other means results in unspecified behavior.
+ * Shutting down the executor service while the channel is open results in
+ * unspecified behavior.
*
* <p> The {@code attrs} parameter is an optional array of file {@link
* FileAttribute file-attributes} to set atomically when creating the file.
@@ -276,7 +257,8 @@
* <p> An invocation of this method behaves in exactly the same way as the
* invocation
* <pre>
- * ch.{@link #open(Path,Set,ExecutorService,FileAttribute[]) open}(file, opts, null, new FileAttribute<?>[0]);
+ * ch.{@link #open(Path,Set,ExecutorService,FileAttribute[])
+ * open}(file, opts, null, new FileAttribute<?>[0]);
* </pre>
* where {@code opts} is a {@code Set} containing the options specified to
* this method.
@@ -405,10 +387,11 @@
/**
* Acquires a lock on the given region of this channel's file.
*
- * <p> This method initiates an operation to acquire a lock on the given region
- * of this channel's file. The method returns a {@code Future} representing
- * the pending result of the operation. Its {@link Future#get() get}
- * method returns the {@link FileLock} on successful completion.
+ * <p> This method initiates an operation to acquire a lock on the given
+ * region of this channel's file. The {@code handler} parameter is a
+ * completion handler that is invoked when the lock is acquired (or the
+ * operation fails). The result passed to the completion handler is the
+ * resulting {@code FileLock}.
*
* <p> The region specified by the {@code position} and {@code size}
* parameters need not be contained within, or even overlap, the actual
@@ -455,9 +438,7 @@
* @param attachment
* The object to attach to the I/O operation; can be {@code null}
* @param handler
- * The handler for consuming the result; can be {@code null}
- *
- * @return a {@code Future} object representing the pending result
+ * The handler for consuming the result
*
* @throws OverlappingFileLockException
* If a lock that overlaps the requested region is already held by
@@ -466,26 +447,24 @@
* @throws IllegalArgumentException
* If the preconditions on the parameters do not hold
* @throws NonReadableChannelException
- * If {@code shared} is true this channel but was not opened for reading
+ * If {@code shared} is true but this channel was not opened for reading
* @throws NonWritableChannelException
* If {@code shared} is false but this channel was not opened for writing
- * @throws ShutdownChannelGroupException
- * If a handler is specified, the channel is closed, and the channel
- * was originally created with its own thread pool
*/
- public abstract <A> Future<FileLock> lock(long position,
- long size,
- boolean shared,
- A attachment,
- CompletionHandler<FileLock,? super A> handler);
+ public abstract <A> void lock(long position,
+ long size,
+ boolean shared,
+ A attachment,
+ CompletionHandler<FileLock,? super A> handler);
/**
* Acquires an exclusive lock on this channel's file.
*
- * <p> This method initiates an operation to acquire an exclusive lock on this
- * channel's file. The method returns a {@code Future} representing
- * the pending result of the operation. Its {@link Future#get() get}
- * method returns the {@link FileLock} on successful completion.
+ * <p> This method initiates an operation to acquire a lock on the given
+ * region of this channel's file. The {@code handler} parameter is a
+ * completion handler that is invoked when the lock is acquired (or the
+ * operation fails). The result passed to the completion handler is the
+ * resulting {@code FileLock}.
*
* <p> An invocation of this method of the form {@code ch.lock(att,handler)}
* behaves in exactly the same way as the invocation
@@ -496,7 +475,70 @@
* @param attachment
* The object to attach to the I/O operation; can be {@code null}
* @param handler
- * The handler for consuming the result; can be {@code null}
+ * The handler for consuming the result
+ *
+ * @throws OverlappingFileLockException
+ * If a lock is already held by this Java virtual machine, or there
+ * is already a pending attempt to lock a region
+ * @throws NonWritableChannelException
+ * If this channel was not opened for writing
+ */
+ public final <A> void lock(A attachment,
+ CompletionHandler<FileLock,? super A> handler)
+ {
+ lock(0L, Long.MAX_VALUE, false, attachment, handler);
+ }
+
+ /**
+ * Acquires a lock on the given region of this channel's file.
+ *
+ * <p> This method initiates an operation to acquire a lock on the given
+ * region of this channel's file. The method behaves in exactly the same
+ * manner as the {@link #lock(long, long, boolean, Object, CompletionHandler)}
+ * method except that instead of specifying a completion handler, this
+ * method returns a {@code Future} representing the pending result. The
+ * {@code Future}'s {@link Future#get() get} method returns the {@link
+ * FileLock} on successful completion.
+ *
+ * @param position
+ * The position at which the locked region is to start; must be
+ * non-negative
+ * @param size
+ * The size of the locked region; must be non-negative, and the sum
+ * {@code position} + {@code size} must be non-negative
+ * @param shared
+ * {@code true} to request a shared lock, in which case this
+ * channel must be open for reading (and possibly writing);
+ * {@code false} to request an exclusive lock, in which case this
+ * channel must be open for writing (and possibly reading)
+ *
+ * @return a {@code Future} object representing the pending result
+ *
+ * @throws OverlappingFileLockException
+ * If a lock is already held by this Java virtual machine, or there
+ * is already a pending attempt to lock a region
+ * @throws IllegalArgumentException
+ * If the preconditions on the parameters do not hold
+ * @throws NonReadableChannelException
+ * If {@code shared} is true but this channel was not opened for reading
+ * @throws NonWritableChannelException
+ * If {@code shared} is false but this channel was not opened for writing
+ */
+ public abstract Future<FileLock> lock(long position, long size, boolean shared);
+
+ /**
+ * Acquires an exclusive lock on this channel's file.
+ *
+ * <p> This method initiates an operation to acquire an exclusive lock on this
+ * channel's file. The method returns a {@code Future} representing the
+ * pending result of the operation. The {@code Future}'s {@link Future#get()
+ * get} method returns the {@link FileLock} on successful completion.
+ *
+ * <p> An invocation of this method behaves in exactly the same way as the
+ * invocation
+ * <pre>
+ * ch.{@link #lock(long,long,boolean) lock}(0L, Long.MAX_VALUE, false)
+ * </pre>
*
* @return a {@code Future} object representing the pending result
*
@@ -505,40 +547,9 @@
* is already a pending attempt to lock a region
* @throws NonWritableChannelException
* If this channel was not opened for writing
- * @throws ShutdownChannelGroupException
- * If a handler is specified, the channel is closed, and the channel
- * was originally created with its own thread pool
- */
- public final <A> Future<FileLock> lock(A attachment,
- CompletionHandler<FileLock,? super A> handler)
- {
- return lock(0L, Long.MAX_VALUE, false, attachment, handler);
- }
-
- /**
- * Acquires an exclusive lock on this channel's file.
- *
- * <p> This method initiates an operation to acquire an exclusive lock on this
- * channel's file. The method returns a {@code Future} representing the
- * pending result of the operation. Its {@link Future#get() get} method
- * returns the {@link FileLock} on successful completion.
- *
- * <p> An invocation of this method behaves in exactly the same way as the
- * invocation
- * <pre>
- * ch.{@link #lock(long,long,boolean,Object,CompletionHandler) lock}(0L, Long.MAX_VALUE, false, null, null)
- * </pre>
- *
- * @return A {@code Future} object representing the pending result
- *
- * @throws OverlappingFileLockException
- * If a lock is already held by this Java virtual machine, or there
- * is already a pending attempt to lock a region
- * @throws NonWritableChannelException
- * If this channel was not opened for writing
*/
public final Future<FileLock> lock() {
- return lock(0L, Long.MAX_VALUE, false, null, null);
+ return lock(0L, Long.MAX_VALUE, false);
}
/**
@@ -576,7 +587,7 @@
* blocked in this method and is attempting to lock an overlapping
* region of the same file
* @throws NonReadableChannelException
- * If {@code shared} is true this channel but was not opened for reading
+ * If {@code shared} is true but this channel was not opened for reading
* @throws NonWritableChannelException
* If {@code shared} is false but this channel was not opened for writing
*
@@ -629,11 +640,10 @@
* starting at the given file position.
*
* <p> This method initiates the reading of a sequence of bytes from this
- * channel into the given buffer, starting at the given file position. This
- * method returns a {@code Future} representing the pending result of the
- * operation. The Future's {@link Future#get() get} method returns the
- * number of bytes read or {@code -1} if the given position is greater than
- * or equal to the file's size at the time that the read is attempted.
+ * channel into the given buffer, starting at the given file position. The
+ * result of the read is the number of bytes read or {@code -1} if the given
+ * position is greater than or equal to the file's size at the time that the
+ * read is attempted.
*
* <p> This method works in the same manner as the {@link
* AsynchronousByteChannel#read(ByteBuffer,Object,CompletionHandler)}
@@ -649,22 +659,17 @@
* @param attachment
* The object to attach to the I/O operation; can be {@code null}
* @param handler
- * The handler for consuming the result; can be {@code null}
- *
- * @return A {@code Future} object representing the pending result
+ * The handler for consuming the result
*
* @throws IllegalArgumentException
* If the position is negative or the buffer is read-only
* @throws NonReadableChannelException
* If this channel was not opened for reading
- * @throws ShutdownChannelGroupException
- * If a handler is specified, the channel is closed, and the channel
- * was originally created with its own thread pool
*/
- public abstract <A> Future<Integer> read(ByteBuffer dst,
- long position,
- A attachment,
- CompletionHandler<Integer,? super A> handler);
+ public abstract <A> void read(ByteBuffer dst,
+ long position,
+ A attachment,
+ CompletionHandler<Integer,? super A> handler);
/**
* Reads a sequence of bytes from this channel into the given buffer,
@@ -673,13 +678,15 @@
* <p> This method initiates the reading of a sequence of bytes from this
* channel into the given buffer, starting at the given file position. This
* method returns a {@code Future} representing the pending result of the
- * operation. The Future's {@link Future#get() get} method returns the
- * number of bytes read or {@code -1} if the given position is greater
+ * operation. The {@code Future}'s {@link Future#get() get} method returns
+ * the number of bytes read or {@code -1} if the given position is greater
* than or equal to the file's size at the time that the read is attempted.
*
- * <p> This method is equivalent to invoking {@link
- * #read(ByteBuffer,long,Object,CompletionHandler)} with the {@code attachment}
- * and handler parameters set to {@code null}.
+ * <p> This method works in the same manner as the {@link
+ * AsynchronousByteChannel#read(ByteBuffer)} method, except that bytes are
+ * read starting at the given file position. If the given file position is
+ * greater than the file's size at the time that the read is attempted then
+ * no bytes are read.
*
* @param dst
* The buffer into which bytes are to be transferred
@@ -694,20 +701,12 @@
* @throws NonReadableChannelException
* If this channel was not opened for reading
*/
- public final Future<Integer> read(ByteBuffer dst, long position) {
- return read(dst, position, null, null);
- }
+ public abstract Future<Integer> read(ByteBuffer dst, long position);
/**
* Writes a sequence of bytes to this channel from the given buffer, starting
* at the given file position.
*
- * <p> This method initiates the writing of a sequence of bytes to this channel
- * from the given buffer, starting at the given file position. The method
- * returns a {@code Future} representing the pending result of the write
- * operation. The Future's {@link Future#get() get} method returns the
- * number of bytes written.
- *
* <p> This method works in the same manner as the {@link
* AsynchronousByteChannel#write(ByteBuffer,Object,CompletionHandler)}
* method, except that bytes are written starting at the given file position.
@@ -724,36 +723,35 @@
* @param attachment
* The object to attach to the I/O operation; can be {@code null}
* @param handler
- * The handler for consuming the result; can be {@code null}
- *
- * @return A {@code Future} object representing the pending result
+ * The handler for consuming the result
*
* @throws IllegalArgumentException
* If the position is negative
* @throws NonWritableChannelException
* If this channel was not opened for writing
- * @throws ShutdownChannelGroupException
- * If a handler is specified, the channel is closed, and the channel
- * was originally created with its own thread pool
*/
- public abstract <A> Future<Integer> write(ByteBuffer src,
- long position,
- A attachment,
- CompletionHandler<Integer,? super A> handler);
+ public abstract <A> void write(ByteBuffer src,
+ long position,
+ A attachment,
+ CompletionHandler<Integer,? super A> handler);
/**
* Writes a sequence of bytes to this channel from the given buffer, starting
* at the given file position.
*
- * <p> This method initiates the writing of a sequence of bytes to this channel
- * from the given buffer, starting at the given file position. The method
- * returns a {@code Future} representing the pending result of the write
- * operation. The Future's {@link Future#get() get} method returns the
- * number of bytes written.
+ * <p> This method initiates the writing of a sequence of bytes to this
+ * channel from the given buffer, starting at the given file position. The
+ * method returns a {@code Future} representing the pending result of the
+ * write operation. The {@code Future}'s {@link Future#get() get} method
+ * returns the number of bytes written.
*
- * <p> This method is equivalent to invoking {@link
- * #write(ByteBuffer,long,Object,CompletionHandler)} with the {@code attachment}
- * and handler parameters set to {@code null}.
+ * <p> This method works in the same manner as the {@link
+ * AsynchronousByteChannel#write(ByteBuffer)} method, except that bytes are
+ * written starting at the given file position. If the given position is
+ * greater than the file's size, at the time that the write is attempted,
+ * then the file will be grown to accommodate the new bytes; the values of
+ * any bytes between the previous end-of-file and the newly-written bytes
+ * are unspecified.
*
* @param src
* The buffer from which bytes are to be transferred
@@ -768,7 +766,5 @@
* @throws NonWritableChannelException
* If this channel was not opened for writing
*/
- public final Future<Integer> write(ByteBuffer src, long position) {
- return write(src, position, null, null);
- }
+ public abstract Future<Integer> write(ByteBuffer src, long position);
}
--- a/jdk/src/share/classes/java/nio/channels/AsynchronousServerSocketChannel.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/src/share/classes/java/nio/channels/AsynchronousServerSocketChannel.java Sun Aug 23 12:53:45 2009 +0100
@@ -85,9 +85,6 @@
* public void failed(Throwable exc, Void att) {
* ...
* }
- * public void cancelled(Void att) {
- * ...
- * }
* });
* </pre>
*
@@ -240,11 +237,11 @@
/**
* Accepts a connection.
*
- * <p> This method initiates accepting a connection made to this channel's
- * socket, returning a {@link Future} representing the pending result
- * of the operation. The {@code Future}'s {@link Future#get() get}
- * method will return the {@link AsynchronousSocketChannel} for the new
- * connection on successful completion.
+ * <p> This method initiates an asynchronous operation to accept a
+ * connection made to this channel's socket. The {@code handler} parameter is
+ * a completion handler that is invoked when a connection is accepted (or
+ * the operation fails). The result passed to the completion handler is
+ * the {@link AsynchronousSocketChannel} to the new connection.
*
* <p> When a new connection is accepted then the resulting {@code
* AsynchronousSocketChannel} will be bound to the same {@link
@@ -269,35 +266,35 @@
* @param attachment
* The object to attach to the I/O operation; can be {@code null}
* @param handler
- * The handler for consuming the result; can be {@code null}
- *
- * @return an <tt>Future</tt> object representing the pending result
+ * The handler for consuming the result
*
* @throws AcceptPendingException
* If an accept operation is already in progress on this channel
* @throws NotYetBoundException
* If this channel's socket has not yet been bound
* @throws ShutdownChannelGroupException
- * If a handler is specified, and the channel group is shutdown
+ * If the channel group has terminated
*/
- public abstract <A> Future<AsynchronousSocketChannel>
- accept(A attachment, CompletionHandler<AsynchronousSocketChannel,? super A> handler);
+ public abstract <A> void accept(A attachment,
+ CompletionHandler<AsynchronousSocketChannel,? super A> handler);
/**
* Accepts a connection.
*
- * <p> This method is equivalent to invoking {@link
- * #accept(Object,CompletionHandler)} with the {@code attachment}
- * and {@code handler} parameters set to {@code null}.
+ * <p> This method initiates an asynchronous operation to accept a
+ * connection made to this channel's socket. The method behaves in exactly
+ * the same manner as the {@link #accept(Object, CompletionHandler)} method
+ * except that instead of specifying a completion handler, this method
+ * returns a {@code Future} representing the pending result. The {@code
+ * Future}'s {@link Future#get() get} method returns the {@link
+ * AsynchronousSocketChannel} to the new connection on successful completion.
*
- * @return an <tt>Future</tt> object representing the pending result
+ * @return a {@code Future} object representing the pending result
*
* @throws AcceptPendingException
* If an accept operation is already in progress on this channel
* @throws NotYetBoundException
* If this channel's socket has not yet been bound
*/
- public final Future<AsynchronousSocketChannel> accept() {
- return accept(null, null);
- }
+ public abstract Future<AsynchronousSocketChannel> accept();
}
--- a/jdk/src/share/classes/java/nio/channels/AsynchronousSocketChannel.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/src/share/classes/java/nio/channels/AsynchronousSocketChannel.java Sun Aug 23 12:53:45 2009 +0100
@@ -274,14 +274,11 @@
/**
* Connects this channel.
*
- * <p> This method initiates an operation to connect this channel, returning
- * a {@code Future} representing the pending result of the operation. If
- * the connection is successfully established then the {@code Future}'s
- * {@link Future#get() get} method will return {@code null}. If the
- * connection cannot be established then the channel is closed. In that case,
- * invoking the {@code get} method throws {@link
- * java.util.concurrent.ExecutionException} with an {@code IOException} as
- * the cause.
+ * <p> This method initiates an operation to connect this channel. The
+ * {@code handler} parameter is a completion handler that is invoked when
+ * the connection is successfully established or connection cannot be
+ * established. If the connection cannot be established then the channel is
+ * closed.
*
* <p> This method performs exactly the same security checks as the {@link
* java.net.Socket} class. That is, if a security manager has been
@@ -294,9 +291,7 @@
* @param attachment
* The object to attach to the I/O operation; can be {@code null}
* @param handler
- * The handler for consuming the result; can be {@code null}
- *
- * @return A {@code Future} object representing the pending result
+ * The handler for consuming the result
*
* @throws UnresolvedAddressException
* If the given remote address is not fully resolved
@@ -307,23 +302,26 @@
* @throws ConnectionPendingException
* If a connection operation is already in progress on this channel
* @throws ShutdownChannelGroupException
- * If a handler is specified, and the channel group is shutdown
+ * If the channel group has terminated
* @throws SecurityException
* If a security manager has been installed
* and it does not permit access to the given remote endpoint
*
* @see #getRemoteAddress
*/
- public abstract <A> Future<Void> connect(SocketAddress remote,
- A attachment,
- CompletionHandler<Void,? super A> handler);
+ public abstract <A> void connect(SocketAddress remote,
+ A attachment,
+ CompletionHandler<Void,? super A> handler);
/**
* Connects this channel.
*
- * <p> This method is equivalent to invoking {@link
- * #connect(SocketAddress,Object,CompletionHandler)} with the {@code attachment}
- * and handler parameters set to {@code null}.
+ * <p> This method initiates an operation to connect this channel. This
+ * method behaves in exactly the same manner as the {@link
+ * #connect(SocketAddress, Object, CompletionHandler)} method except that
+ * instead of specifying a completion handler, this method returns a {@code
+ * Future} representing the pending result. The {@code Future}'s {@link
+ * Future#get() get} method returns {@code null} on successful completion.
*
* @param remote
* The remote address to which this channel is to be connected
@@ -342,18 +340,17 @@
* If a security manager has been installed
* and it does not permit access to the given remote endpoint
*/
- public final Future<Void> connect(SocketAddress remote) {
- return connect(remote, null, null);
- }
+ public abstract Future<Void> connect(SocketAddress remote);
/**
* Reads a sequence of bytes from this channel into the given buffer.
*
- * <p> This method initiates the reading of a sequence of bytes from this
- * channel into the given buffer, returning a {@code Future} representing
- * the pending result of the operation. The {@code Future}'s {@link
- * Future#get() get} method returns the number of bytes read or {@code -1}
- * if all bytes have been read and channel has reached end-of-stream.
+ * <p> This method initiates an asynchronous read operation to read a
+ * sequence of bytes from this channel into the given buffer. The {@code
+ * handler} parameter is a completion handler that is invoked when the read
+ * operation completes (or fails). The result passed to the completion
+ * handler is the number of bytes read or {@code -1} if no bytes could be
+ * read because the channel has reached end-of-stream.
*
* <p> If a timeout is specified and the timeout elapses before the operation
* completes then the operation completes with the exception {@link
@@ -376,9 +373,7 @@
* @param attachment
* The object to attach to the I/O operation; can be {@code null}
* @param handler
- * The handler for consuming the result; can be {@code null}
- *
- * @return A {@code Future} object representing the pending result
+ * The handler for consuming the result
*
* @throws IllegalArgumentException
* If the {@code timeout} parameter is negative or the buffer is
@@ -388,13 +383,13 @@
* @throws NotYetConnectedException
* If this channel is not yet connected
* @throws ShutdownChannelGroupException
- * If a handler is specified, and the channel group is shutdown
+ * If the channel group has terminated
*/
- public abstract <A> Future<Integer> read(ByteBuffer dst,
- long timeout,
- TimeUnit unit,
- A attachment,
- CompletionHandler<Integer,? super A> handler);
+ public abstract <A> void read(ByteBuffer dst,
+ long timeout,
+ TimeUnit unit,
+ A attachment,
+ CompletionHandler<Integer,? super A> handler);
/**
* @throws IllegalArgumentException {@inheritDoc}
@@ -402,14 +397,14 @@
* @throws NotYetConnectedException
* If this channel is not yet connected
* @throws ShutdownChannelGroupException
- * If a handler is specified, and the channel group is shutdown
+ * If the channel group has terminated
*/
@Override
- public final <A> Future<Integer> read(ByteBuffer dst,
- A attachment,
- CompletionHandler<Integer,? super A> handler)
+ public final <A> void read(ByteBuffer dst,
+ A attachment,
+ CompletionHandler<Integer,? super A> handler)
{
- return read(dst, 0L, TimeUnit.MILLISECONDS, attachment, handler);
+ read(dst, 0L, TimeUnit.MILLISECONDS, attachment, handler);
}
/**
@@ -419,16 +414,18 @@
* If this channel is not yet connected
*/
@Override
- public final Future<Integer> read(ByteBuffer dst) {
- return read(dst, 0L, TimeUnit.MILLISECONDS, null, null);
- }
+ public abstract Future<Integer> read(ByteBuffer dst);
/**
* Reads a sequence of bytes from this channel into a subsequence of the
* given buffers. This operation, sometimes called a <em>scattering read</em>,
* is often useful when implementing network protocols that group data into
* segments consisting of one or more fixed-length headers followed by a
- * variable-length body.
+ * variable-length body. The {@code handler} parameter is a completion
+ * handler that is invoked when the read operation completes (or fails). The
+ * result passed to the completion handler is the number of bytes read or
+ * {@code -1} if no bytes could be read because the channel has reached
+ * end-of-stream.
*
* <p> This method initiates a read of up to <i>r</i> bytes from this channel,
* where <i>r</i> is the total number of bytes remaining in the specified
@@ -456,11 +453,6 @@
* I/O operation is performed with the maximum number of buffers allowed by
* the operating system.
*
- * <p> The return value from this method is a {@code Future} representing
- * the pending result of the operation. The {@code Future}'s {@link
- * Future#get() get} method returns the number of bytes read or {@code -1L}
- * if all bytes have been read and the channel has reached end-of-stream.
- *
* <p> If a timeout is specified and the timeout elapses before the operation
* completes then it completes with the exception {@link
* InterruptedByTimeoutException}. Where a timeout occurs, and the
@@ -485,9 +477,7 @@
* @param attachment
* The object to attach to the I/O operation; can be {@code null}
* @param handler
- * The handler for consuming the result; can be {@code null}
- *
- * @return A {@code Future} object representing the pending result
+ * The handler for consuming the result
*
* @throws IndexOutOfBoundsException
* If the pre-conditions for the {@code offset} and {@code length}
@@ -500,23 +490,24 @@
* @throws NotYetConnectedException
* If this channel is not yet connected
* @throws ShutdownChannelGroupException
- * If a handler is specified, and the channel group is shutdown
+ * If the channel group has terminated
*/
- public abstract <A> Future<Long> read(ByteBuffer[] dsts,
- int offset,
- int length,
- long timeout,
- TimeUnit unit,
- A attachment,
- CompletionHandler<Long,? super A> handler);
+ public abstract <A> void read(ByteBuffer[] dsts,
+ int offset,
+ int length,
+ long timeout,
+ TimeUnit unit,
+ A attachment,
+ CompletionHandler<Long,? super A> handler);
/**
* Writes a sequence of bytes to this channel from the given buffer.
*
- * <p> This method initiates the writing of a sequence of bytes to this channel
- * from the given buffer, returning a {@code Future} representing the
- * pending result of the operation. The {@code Future}'s {@link Future#get()
- * get} method will return the number of bytes written.
+ * <p> This method initiates an asynchronous write operation to write a
+ * sequence of bytes to this channel from the given buffer. The {@code
+ * handler} parameter is a completion handler that is invoked when the write
+ * operation completes (or fails). The result passed to the completion
+ * handler is the number of bytes written.
*
* <p> If a timeout is specified and the timeout elapses before the operation
* completes then it completes with the exception {@link
@@ -539,9 +530,7 @@
* @param attachment
* The object to attach to the I/O operation; can be {@code null}
* @param handler
- * The handler for consuming the result; can be {@code null}
- *
- * @return A {@code Future} object representing the pending result
+ * The handler for consuming the result
*
* @throws IllegalArgumentException
* If the {@code timeout} parameter is negative
@@ -550,28 +539,28 @@
* @throws NotYetConnectedException
* If this channel is not yet connected
* @throws ShutdownChannelGroupException
- * If a handler is specified, and the channel group is shutdown
+ * If the channel group has terminated
*/
- public abstract <A> Future<Integer> write(ByteBuffer src,
- long timeout,
- TimeUnit unit,
- A attachment,
- CompletionHandler<Integer,? super A> handler);
+ public abstract <A> void write(ByteBuffer src,
+ long timeout,
+ TimeUnit unit,
+ A attachment,
+ CompletionHandler<Integer,? super A> handler);
/**
* @throws WritePendingException {@inheritDoc}
* @throws NotYetConnectedException
* If this channel is not yet connected
* @throws ShutdownChannelGroupException
- * If a handler is specified, and the channel group is shutdown
+ * If the channel group has terminated
*/
@Override
- public final <A> Future<Integer> write(ByteBuffer src,
- A attachment,
- CompletionHandler<Integer,? super A> handler)
+ public final <A> void write(ByteBuffer src,
+ A attachment,
+ CompletionHandler<Integer,? super A> handler)
{
- return write(src, 0L, TimeUnit.MILLISECONDS, attachment, handler);
+ write(src, 0L, TimeUnit.MILLISECONDS, attachment, handler);
}
/**
@@ -580,16 +569,16 @@
* If this channel is not yet connected
*/
@Override
- public final Future<Integer> write(ByteBuffer src) {
- return write(src, 0L, TimeUnit.MILLISECONDS, null, null);
- }
+ public abstract Future<Integer> write(ByteBuffer src);
/**
* Writes a sequence of bytes to this channel from a subsequence of the given
* buffers. This operation, sometimes called a <em>gathering write</em>, is
* often useful when implementing network protocols that group data into
* segments consisting of one or more fixed-length headers followed by a
- * variable-length body.
+ * variable-length body. The {@code handler} parameter is a completion
+ * handler that is invoked when the write operation completes (or fails).
+ * The result passed to the completion handler is the number of bytes written.
*
* <p> This method initiates a write of up to <i>r</i> bytes to this channel,
* where <i>r</i> is the total number of bytes remaining in the specified
@@ -616,10 +605,6 @@
* remaining), exceeds this limit, then the I/O operation is performed with
* the maximum number of buffers allowed by the operating system.
*
- * <p> The return value from this method is a {@code Future} representing
- * the pending result of the operation. The {@code Future}'s {@link
- * Future#get() get} method will return the number of bytes written.
- *
* <p> If a timeout is specified and the timeout elapses before the operation
* completes then it completes with the exception {@link
* InterruptedByTimeoutException}. Where a timeout occurs, and the
@@ -644,9 +629,7 @@
* @param attachment
* The object to attach to the I/O operation; can be {@code null}
* @param handler
- * The handler for consuming the result; can be {@code null}
- *
- * @return A {@code Future} object representing the pending result
+ * The handler for consuming the result
*
* @throws IndexOutOfBoundsException
* If the pre-conditions for the {@code offset} and {@code length}
@@ -658,13 +641,13 @@
* @throws NotYetConnectedException
* If this channel is not yet connected
* @throws ShutdownChannelGroupException
- * If a handler is specified, and the channel group is shutdown
+ * If the channel group has terminated
*/
- public abstract <A> Future<Long> write(ByteBuffer[] srcs,
- int offset,
- int length,
- long timeout,
- TimeUnit unit,
- A attachment,
- CompletionHandler<Long,? super A> handler);
+ public abstract <A> void write(ByteBuffer[] srcs,
+ int offset,
+ int length,
+ long timeout,
+ TimeUnit unit,
+ A attachment,
+ CompletionHandler<Long,? super A> handler);
}
--- a/jdk/src/share/classes/java/nio/channels/CompletionHandler.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/src/share/classes/java/nio/channels/CompletionHandler.java Sun Aug 23 12:53:45 2009 +0100
@@ -32,11 +32,9 @@
* handler to be specified to consume the result of an asynchronous operation.
* The {@link #completed completed} method is invoked when the I/O operation
* completes successfully. The {@link #failed failed} method is invoked if the
- * I/O operations fails. The {@link #cancelled cancelled} method is invoked when
- * the I/O operation is cancelled by invoking the {@link
- * java.util.concurrent.Future#cancel cancel} method. The implementations of
- * these methods should complete in a timely manner so as to avoid keeping the
- * invoking thread from dispatching to other completion handlers.
+ * I/O operations fails. The implementations of these methods should complete
+ * in a timely manner so as to avoid keeping the invoking thread from dispatching
+ * to other completion handlers.
*
* @param <V> The result type of the I/O operation
* @param <A> The type of the object attached to the I/O operation
@@ -65,13 +63,4 @@
* The object attached to the I/O operation when it was initiated.
*/
void failed(Throwable exc, A attachment);
-
- /**
- * Invoked when an operation is cancelled by invoking the {@link
- * java.util.concurrent.Future#cancel cancel} method.
- *
- * @param attachment
- * The object attached to the I/O operation when it was initiated.
- */
- void cancelled(A attachment);
}
--- a/jdk/src/share/classes/java/nio/channels/exceptions Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/src/share/classes/java/nio/channels/exceptions Sun Aug 23 12:53:45 2009 +0100
@@ -190,5 +190,5 @@
gen ShutdownChannelGroupException "
* Unchecked exception thrown when an attempt is made to construct a channel in
* a group that is shutdown or the completion handler for an I/O operation
- * cannot be invoked because the channel group is shutdown." \
+ * cannot be invoked because the channel group has terminated." \
-3903801676350154157L
--- a/jdk/src/share/classes/sun/nio/ch/AbstractFuture.java Sat Aug 22 17:40:18 2009 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,63 +0,0 @@
-/*
- * Copyright 2008-2009 Sun Microsystems, Inc. All Rights Reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation. Sun designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Sun in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
- */
-
-package sun.nio.ch;
-
-import java.nio.channels.AsynchronousChannel;
-import java.util.concurrent.Future;
-
-/**
- * Base implementation of Future used for asynchronous I/O
- */
-
-abstract class AbstractFuture<V,A>
- implements Future<V>
-{
- private final AsynchronousChannel channel;
- private final A attachment;
-
- protected AbstractFuture(AsynchronousChannel channel, A attachment) {
- this.channel = channel;
- this.attachment = attachment;
- }
-
- final AsynchronousChannel channel() {
- return channel;
- }
-
- final A attachment() {
- return attachment;
- }
-
- /**
- * Returns the result of the operation if it has completed successfully.
- */
- abstract V value();
-
- /**
- * Returns the exception if the operation has failed.
- */
- abstract Throwable exception();
-}
--- a/jdk/src/share/classes/sun/nio/ch/AsynchronousChannelGroupImpl.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/src/share/classes/sun/nio/ch/AsynchronousChannelGroupImpl.java Sun Aug 23 12:53:45 2009 +0100
@@ -32,8 +32,8 @@
import java.io.FileDescriptor;
import java.util.Queue;
import java.util.concurrent.*;
-import java.util.concurrent.locks.*;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.security.PrivilegedAction;
import java.security.AccessController;
import java.security.AccessControlContext;
@@ -65,11 +65,8 @@
private final Queue<Runnable> taskQueue;
// group shutdown
- // shutdownLock is RW lock so as to allow for concurrent queuing of tasks
- // when using a fixed thread pool.
- private final ReadWriteLock shutdownLock = new ReentrantReadWriteLock();
+ private final AtomicBoolean shutdown = new AtomicBoolean();
private final Object shutdownNowLock = new Object();
- private volatile boolean shutdown;
private volatile boolean terminateInitiated;
AsynchronousChannelGroupImpl(AsynchronousChannelProvider provider,
@@ -214,7 +211,7 @@
@Override
public final boolean isShutdown() {
- return shutdown;
+ return shutdown.get();
}
@Override
@@ -260,17 +257,10 @@
@Override
public final void shutdown() {
- shutdownLock.writeLock().lock();
- try {
- if (shutdown) {
- // already shutdown
- return;
- }
- shutdown = true;
- } finally {
- shutdownLock.writeLock().unlock();
+ if (shutdown.getAndSet(true)) {
+ // already shutdown
+ return;
}
-
// if there are channels in the group then shutdown will continue
// when the last channel is closed
if (!isEmpty()) {
@@ -289,12 +279,7 @@
@Override
public final void shutdownNow() throws IOException {
- shutdownLock.writeLock().lock();
- try {
- shutdown = true;
- } finally {
- shutdownLock.writeLock().unlock();
- }
+ shutdown.set(true);
synchronized (shutdownNowLock) {
if (!terminateInitiated) {
terminateInitiated = true;
@@ -305,6 +290,18 @@
}
}
+ /**
+ * For use by AsynchronousFileChannel to release resources without shutting
+ * down the thread pool.
+ */
+ final void detachFromThreadPool() {
+ if (shutdown.getAndSet(true))
+ throw new AssertionError("Already shutdown");
+ if (!isEmpty())
+ throw new AssertionError("Group not empty");
+ shutdownHandlerTasks();
+ }
+
@Override
public final boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException
--- a/jdk/src/share/classes/sun/nio/ch/AsynchronousFileChannelImpl.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/src/share/classes/sun/nio/ch/AsynchronousFileChannelImpl.java Sun Aug 23 12:53:45 2009 +0100
@@ -25,8 +25,10 @@
package sun.nio.ch;
+import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.locks.*;
import java.io.FileDescriptor;
import java.io.IOException;
@@ -101,6 +103,33 @@
// -- file locking --
+ abstract <A> Future<FileLock> implLock(long position,
+ long size,
+ boolean shared,
+ A attachment,
+ CompletionHandler<FileLock,? super A> handler);
+
+ @Override
+ public final Future<FileLock> lock(long position,
+ long size,
+ boolean shared)
+
+ {
+ return implLock(position, size, shared, null, null);
+ }
+
+ @Override
+ public final <A> void lock(long position,
+ long size,
+ boolean shared,
+ A attachment,
+ CompletionHandler<FileLock,? super A> handler)
+ {
+ if (handler == null)
+ throw new NullPointerException("'handler' is null");
+ implLock(position, size, shared, attachment, handler);
+ }
+
private volatile FileLockTable fileLockTable;
final void ensureFileLockTableInitialized() throws IOException {
@@ -175,4 +204,50 @@
end();
}
}
+
+
+ // -- reading and writing --
+
+ abstract <A> Future<Integer> implRead(ByteBuffer dst,
+ long position,
+ A attachment,
+ CompletionHandler<Integer,? super A> handler);
+
+ @Override
+ public final Future<Integer> read(ByteBuffer dst, long position) {
+ return implRead(dst, position, null, null);
+ }
+
+ @Override
+ public final <A> void read(ByteBuffer dst,
+ long position,
+ A attachment,
+ CompletionHandler<Integer,? super A> handler)
+ {
+ if (handler == null)
+ throw new NullPointerException("'handler' is null");
+ implRead(dst, position, attachment, handler);
+ }
+
+ abstract <A> Future<Integer> implWrite(ByteBuffer src,
+ long position,
+ A attachment,
+ CompletionHandler<Integer,? super A> handler);
+
+
+ @Override
+ public final Future<Integer> write(ByteBuffer src, long position) {
+ return implWrite(src, position, null, null);
+ }
+
+ @Override
+ public final <A> void write(ByteBuffer src,
+ long position,
+ A attachment,
+ CompletionHandler<Integer,? super A> handler)
+ {
+ if (handler == null)
+ throw new NullPointerException("'handler' is null");
+ implWrite(src, position, attachment, handler);
+ }
}
--- a/jdk/src/share/classes/sun/nio/ch/AsynchronousServerSocketChannelImpl.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/src/share/classes/sun/nio/ch/AsynchronousServerSocketChannelImpl.java Sun Aug 23 12:53:45 2009 +0100
@@ -35,6 +35,7 @@
import java.util.Set;
import java.util.HashSet;
import java.util.Collections;
+import java.util.concurrent.Future;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import sun.net.NetHooks;
@@ -108,6 +109,29 @@
implClose();
}
+ /**
+ * Invoked by accept to accept connection
+ */
+ abstract Future<AsynchronousSocketChannel>
+ implAccept(Object attachment,
+ CompletionHandler<AsynchronousSocketChannel,Object> handler);
+
+
+ @Override
+ public final Future<AsynchronousSocketChannel> accept() {
+ return implAccept(null, null);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public final <A> void accept(A attachment,
+ CompletionHandler<AsynchronousSocketChannel,? super A> handler)
+ {
+ if (handler == null)
+ throw new NullPointerException("'handler' is null");
+ implAccept(attachment, (CompletionHandler<AsynchronousSocketChannel,Object>)handler);
+ }
+
final boolean isAcceptKilled() {
return acceptKilled;
}
--- a/jdk/src/share/classes/sun/nio/ch/AsynchronousSocketChannelImpl.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/src/share/classes/sun/nio/ch/AsynchronousSocketChannelImpl.java Sun Aug 23 12:53:45 2009 +0100
@@ -184,28 +184,53 @@
}
/**
+ * Invoked by connect to initiate the connect operation.
+ */
+ abstract <A> Future<Void> implConnect(SocketAddress remote,
+ A attachment,
+ CompletionHandler<Void,? super A> handler);
+
+ @Override
+ public final Future<Void> connect(SocketAddress remote) {
+ return implConnect(remote, null, null);
+ }
+
+ @Override
+ public final <A> void connect(SocketAddress remote,
+ A attachment,
+ CompletionHandler<Void,? super A> handler)
+ {
+ if (handler == null)
+ throw new NullPointerException("'handler' is null");
+ implConnect(remote, attachment, handler);
+ }
+
+ /**
* Invoked by read to initiate the I/O operation.
*/
- abstract <V extends Number,A> Future<V> readImpl(ByteBuffer[] dsts,
- boolean isScatteringRead,
+ abstract <V extends Number,A> Future<V> implRead(boolean isScatteringRead,
+ ByteBuffer dst,
+ ByteBuffer[] dsts,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<V,? super A> handler);
@SuppressWarnings("unchecked")
- private <V extends Number,A> Future<V> read(ByteBuffer[] dsts,
- boolean isScatteringRead,
+ private <V extends Number,A> Future<V> read(boolean isScatteringRead,
+ ByteBuffer dst,
+ ByteBuffer[] dsts,
long timeout,
TimeUnit unit,
- A attachment,
+ A att,
CompletionHandler<V,? super A> handler)
{
if (!isOpen()) {
- CompletedFuture<V,A> result = CompletedFuture
- .withFailure(this, new ClosedChannelException(), attachment);
- Invoker.invoke(handler, result);
- return result;
+ Throwable e = new ClosedChannelException();
+ if (handler == null)
+ return CompletedFuture.withFailure(e);
+ Invoker.invoke(this, handler, att, null, e);
+ return null;
}
if (remoteAddress == null)
@@ -213,13 +238,13 @@
if (timeout < 0L)
throw new IllegalArgumentException("Negative timeout");
- boolean hasSpaceToRead = isScatteringRead || dsts[0].hasRemaining();
+ boolean hasSpaceToRead = isScatteringRead || dst.hasRemaining();
boolean shutdown = false;
// check and update state
synchronized (readLock) {
if (readKilled)
- throw new RuntimeException("Reading not allowed due to timeout or cancellation");
+ throw new IllegalStateException("Reading not allowed due to timeout or cancellation");
if (reading)
throw new ReadPendingException();
if (readShutdown) {
@@ -234,44 +259,53 @@
// immediately complete with -1 if shutdown for read
// immediately complete with 0 if no space remaining
if (shutdown || !hasSpaceToRead) {
- CompletedFuture<V,A> result;
+ Number result;
if (isScatteringRead) {
- Long value = (shutdown) ? Long.valueOf(-1L) : Long.valueOf(0L);
- result = (CompletedFuture<V,A>)CompletedFuture.withResult(this, value, attachment);
+ result = (shutdown) ? Long.valueOf(-1L) : Long.valueOf(0L);
} else {
- int value = (shutdown) ? -1 : 0;
- result = (CompletedFuture<V,A>)CompletedFuture.withResult(this, value, attachment);
+ result = (shutdown) ? -1 : 0;
}
- Invoker.invoke(handler, result);
- return result;
+ if (handler == null)
+ return CompletedFuture.withResult((V)result);
+ Invoker.invoke(this, handler, att, (V)result, null);
+ return null;
}
- return readImpl(dsts, isScatteringRead, timeout, unit, attachment, handler);
+ return implRead(isScatteringRead, dst, dsts, timeout, unit, att, handler);
+ }
+
+ @Override
+ public final Future<Integer> read(ByteBuffer dst) {
+ if (dst.isReadOnly())
+ throw new IllegalArgumentException("Read-only buffer");
+ return read(false, dst, null, 0L, TimeUnit.MILLISECONDS, null, null);
}
@Override
- public final <A> Future<Integer> read(ByteBuffer dst,
- long timeout,
- TimeUnit unit,
- A attachment,
- CompletionHandler<Integer,? super A> handler)
+ public final <A> void read(ByteBuffer dst,
+ long timeout,
+ TimeUnit unit,
+ A attachment,
+ CompletionHandler<Integer,? super A> handler)
{
+ if (handler == null)
+ throw new NullPointerException("'handler' is null");
if (dst.isReadOnly())
throw new IllegalArgumentException("Read-only buffer");
- ByteBuffer[] bufs = new ByteBuffer[1];
- bufs[0] = dst;
- return read(bufs, false, timeout, unit, attachment, handler);
+ read(false, dst, null, timeout, unit, attachment, handler);
}
@Override
- public final <A> Future<Long> read(ByteBuffer[] dsts,
- int offset,
- int length,
- long timeout,
- TimeUnit unit,
- A attachment,
- CompletionHandler<Long,? super A> handler)
+ public final <A> void read(ByteBuffer[] dsts,
+ int offset,
+ int length,
+ long timeout,
+ TimeUnit unit,
+ A attachment,
+ CompletionHandler<Long,? super A> handler)
{
+ if (handler == null)
+ throw new NullPointerException("'handler' is null");
if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
throw new IndexOutOfBoundsException();
ByteBuffer[] bufs = Util.subsequence(dsts, offset, length);
@@ -279,39 +313,41 @@
if (bufs[i].isReadOnly())
throw new IllegalArgumentException("Read-only buffer");
}
- return read(bufs, true, timeout, unit, attachment, handler);
+ read(true, null, bufs, timeout, unit, attachment, handler);
}
/**
* Invoked by write to initiate the I/O operation.
*/
- abstract <V extends Number,A> Future<V> writeImpl(ByteBuffer[] srcs,
- boolean isGatheringWrite,
+ abstract <V extends Number,A> Future<V> implWrite(boolean isGatheringWrite,
+ ByteBuffer src,
+ ByteBuffer[] srcs,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<V,? super A> handler);
@SuppressWarnings("unchecked")
- private <V extends Number,A> Future<V> write(ByteBuffer[] srcs,
- boolean isGatheringWrite,
+ private <V extends Number,A> Future<V> write(boolean isGatheringWrite,
+ ByteBuffer src,
+ ByteBuffer[] srcs,
long timeout,
TimeUnit unit,
- A attachment,
+ A att,
CompletionHandler<V,? super A> handler)
{
- boolean hasDataToWrite = isGatheringWrite || srcs[0].hasRemaining();
+ boolean hasDataToWrite = isGatheringWrite || src.hasRemaining();
boolean closed = false;
if (isOpen()) {
if (remoteAddress == null)
throw new NotYetConnectedException();
- if (timeout < 0L)
+ if (timeout < 0L)
throw new IllegalArgumentException("Negative timeout");
// check and update state
synchronized (writeLock) {
if (writeKilled)
- throw new RuntimeException("Writing not allowed due to timeout or cancellation");
+ throw new IllegalStateException("Writing not allowed due to timeout or cancellation");
if (writing)
throw new WritePendingException();
if (writeShutdown) {
@@ -327,52 +363,57 @@
// channel is closed or shutdown for write
if (closed) {
- CompletedFuture<V,A> result = CompletedFuture
- .withFailure(this, new ClosedChannelException(), attachment);
- Invoker.invoke(handler, result);
- return result;
+ Throwable e = new ClosedChannelException();
+ if (handler == null)
+ return CompletedFuture.withFailure(e);
+ Invoker.invoke(this, handler, att, null, e);
+ return null;
}
// nothing to write so complete immediately
if (!hasDataToWrite) {
- CompletedFuture<V,A> result;
- if (isGatheringWrite) {
- result = (CompletedFuture<V,A>)CompletedFuture.withResult(this, 0L, attachment);
- } else {
- result = (CompletedFuture<V,A>)CompletedFuture.withResult(this, 0, attachment);
- }
- Invoker.invoke(handler, result);
- return result;
+ Number result = (isGatheringWrite) ? (Number)0L : (Number)0;
+ if (handler == null)
+ return CompletedFuture.withResult((V)result);
+ Invoker.invoke(this, handler, att, (V)result, null);
+ return null;
}
- return writeImpl(srcs, isGatheringWrite, timeout, unit, attachment, handler);
+ return implWrite(isGatheringWrite, src, srcs, timeout, unit, att, handler);
+ }
+
+ @Override
+ public final Future<Integer> write(ByteBuffer src) {
+ return write(false, src, null, 0L, TimeUnit.MILLISECONDS, null, null);
}
@Override
- public final <A> Future<Integer> write(ByteBuffer src,
- long timeout,
- TimeUnit unit,
- A attachment,
- CompletionHandler<Integer,? super A> handler)
+ public final <A> void write(ByteBuffer src,
+ long timeout,
+ TimeUnit unit,
+ A attachment,
+ CompletionHandler<Integer,? super A> handler)
{
- ByteBuffer[] bufs = new ByteBuffer[1];
- bufs[0] = src;
- return write(bufs, false, timeout, unit, attachment, handler);
+ if (handler == null)
+ throw new NullPointerException("'handler' is null");
+ write(false, src, null, timeout, unit, attachment, handler);
}
@Override
- public final <A> Future<Long> write(ByteBuffer[] srcs,
- int offset,
- int length,
- long timeout,
- TimeUnit unit,
- A attachment,
- CompletionHandler<Long,? super A> handler)
+ public final <A> void write(ByteBuffer[] srcs,
+ int offset,
+ int length,
+ long timeout,
+ TimeUnit unit,
+ A attachment,
+ CompletionHandler<Long,? super A> handler)
{
+ if (handler == null)
+ throw new NullPointerException("'handler' is null");
if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
throw new IndexOutOfBoundsException();
srcs = Util.subsequence(srcs, offset, length);
- return write(srcs, true, timeout, unit, attachment, handler);
+ write(true, null, srcs, timeout, unit, attachment, handler);
}
@Override
@@ -461,7 +502,6 @@
}
@Override
- @SuppressWarnings("unchecked")
public final SocketAddress getRemoteAddress() throws IOException {
if (!isOpen())
throw new ClosedChannelException();
--- a/jdk/src/share/classes/sun/nio/ch/CompletedFuture.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/src/share/classes/sun/nio/ch/CompletedFuture.java Sun Aug 23 12:53:45 2009 +0100
@@ -25,7 +25,7 @@
package sun.nio.ch;
-import java.nio.channels.AsynchronousChannel;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutionException;
import java.io.IOException;
@@ -35,39 +35,35 @@
* completed.
*/
-final class CompletedFuture<V,A>
- extends AbstractFuture<V,A>
-{
+final class CompletedFuture<V> implements Future<V> {
private final V result;
private final Throwable exc;
- private CompletedFuture(AsynchronousChannel channel,
- V result,
- Throwable exc,
- A attachment)
- {
- super(channel, attachment);
+ private CompletedFuture(V result, Throwable exc) {
this.result = result;
this.exc = exc;
}
@SuppressWarnings("unchecked")
- static <V,A> CompletedFuture<V,A> withResult(AsynchronousChannel channel,
- V result,
- A attachment)
- {
- return new CompletedFuture<V,A>(channel, result, null, attachment);
+ static <V> CompletedFuture<V> withResult(V result) {
+ return new CompletedFuture<V>(result, null);
}
@SuppressWarnings("unchecked")
- static <V,A> CompletedFuture<V,A> withFailure(AsynchronousChannel channel,
- Throwable exc,
- A attachment)
- {
+ static <V> CompletedFuture<V> withFailure(Throwable exc) {
// exception must be IOException or SecurityException
if (!(exc instanceof IOException) && !(exc instanceof SecurityException))
exc = new IOException(exc);
- return new CompletedFuture(channel, null, exc, attachment);
+ return new CompletedFuture(null, exc);
+ }
+
+ @SuppressWarnings("unchecked")
+ static <V> CompletedFuture<V> withResult(V result, Throwable exc) {
+ if (exc == null) {
+ return withResult(result);
+ } else {
+ return withFailure(exc);
+ }
}
@Override
@@ -100,14 +96,4 @@
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
-
- @Override
- Throwable exception() {
- return exc;
- }
-
- @Override
- V value() {
- return result;
- }
}
--- a/jdk/src/share/classes/sun/nio/ch/Invoker.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/src/share/classes/sun/nio/ch/Invoker.java Sun Aug 23 12:53:45 2009 +0100
@@ -117,33 +117,32 @@
* Invoke handler without checking the thread identity or number of handlers
* on the thread stack.
*/
- @SuppressWarnings("unchecked")
static <V,A> void invokeUnchecked(CompletionHandler<V,? super A> handler,
- AbstractFuture<V,A> result)
+ A attachment,
+ V value,
+ Throwable exc)
{
- if (handler != null && !result.isCancelled()) {
- Throwable exc = result.exception();
- if (exc == null) {
- handler.completed(result.value(), result.attachment());
- } else {
- handler.failed(exc, result.attachment());
- }
+ if (exc == null) {
+ handler.completed(value, attachment);
+ } else {
+ handler.failed(exc, attachment);
+ }
- // clear interrupt
- Thread.interrupted();
- }
+ // clear interrupt
+ Thread.interrupted();
}
-
/**
- * Invoke handler after incrementing the invoke count.
+ * Invoke handler assuming thread identity already checked
*/
static <V,A> void invokeDirect(GroupAndInvokeCount myGroupAndInvokeCount,
CompletionHandler<V,? super A> handler,
- AbstractFuture<V,A> result)
+ A attachment,
+ V result,
+ Throwable exc)
{
myGroupAndInvokeCount.incrementInvokeCount();
- invokeUnchecked(handler, result);
+ Invoker.invokeUnchecked(handler, attachment, result, exc);
}
/**
@@ -151,64 +150,64 @@
* thread pool then the handler is invoked directly, otherwise it is
* invoked indirectly.
*/
- static <V,A> void invoke(CompletionHandler<V,? super A> handler,
- AbstractFuture<V,A> result)
+ static <V,A> void invoke(AsynchronousChannel channel,
+ CompletionHandler<V,? super A> handler,
+ A attachment,
+ V result,
+ Throwable exc)
{
- if (handler != null) {
- boolean invokeDirect = false;
- boolean identityOkay = false;
- GroupAndInvokeCount thisGroupAndInvokeCount = myGroupAndInvokeCount.get();
- if (thisGroupAndInvokeCount != null) {
- AsynchronousChannel channel = result.channel();
- if ((thisGroupAndInvokeCount.group() == ((Groupable)channel).group()))
- identityOkay = true;
- if (identityOkay &&
- (thisGroupAndInvokeCount.invokeCount() < maxHandlerInvokeCount))
- {
- // group match
- invokeDirect = true;
- }
+ boolean invokeDirect = false;
+ boolean identityOkay = false;
+ GroupAndInvokeCount thisGroupAndInvokeCount = myGroupAndInvokeCount.get();
+ if (thisGroupAndInvokeCount != null) {
+ if ((thisGroupAndInvokeCount.group() == ((Groupable)channel).group()))
+ identityOkay = true;
+ if (identityOkay &&
+ (thisGroupAndInvokeCount.invokeCount() < maxHandlerInvokeCount))
+ {
+ // group match
+ invokeDirect = true;
}
- if (invokeDirect) {
- thisGroupAndInvokeCount.incrementInvokeCount();
- invokeUnchecked(handler, result);
- } else {
- try {
- invokeIndirectly(handler, result);
- } catch (RejectedExecutionException ree) {
- // channel group shutdown; fallback to invoking directly
- // if the current thread has the right identity.
- if (identityOkay) {
- invokeUnchecked(handler, result);
- } else {
- throw new ShutdownChannelGroupException();
- }
+ }
+ if (invokeDirect) {
+ invokeDirect(thisGroupAndInvokeCount, handler, attachment, result, exc);
+ } else {
+ try {
+ invokeIndirectly(channel, handler, attachment, result, exc);
+ } catch (RejectedExecutionException ree) {
+ // channel group shutdown; fallback to invoking directly
+ // if the current thread has the right identity.
+ if (identityOkay) {
+ invokeDirect(thisGroupAndInvokeCount,
+ handler, attachment, result, exc);
+ } else {
+ throw new ShutdownChannelGroupException();
}
}
}
}
/**
- * Invokes the handler "indirectly" in the channel group's thread pool.
+ * Invokes the handler indirectly via the channel group's thread pool.
*/
- static <V,A> void invokeIndirectly(final CompletionHandler<V,? super A> handler,
- final AbstractFuture<V,A> result)
+ static <V,A> void invokeIndirectly(AsynchronousChannel channel,
+ final CompletionHandler<V,? super A> handler,
+ final A attachment,
+ final V result,
+ final Throwable exc)
{
- if (handler != null) {
- AsynchronousChannel channel = result.channel();
- try {
- ((Groupable)channel).group().executeOnPooledThread(new Runnable() {
- public void run() {
- GroupAndInvokeCount thisGroupAndInvokeCount =
- myGroupAndInvokeCount.get();
- if (thisGroupAndInvokeCount != null)
- thisGroupAndInvokeCount.setInvokeCount(1);
- invokeUnchecked(handler, result);
- }
- });
- } catch (RejectedExecutionException ree) {
- throw new ShutdownChannelGroupException();
- }
+ try {
+ ((Groupable)channel).group().executeOnPooledThread(new Runnable() {
+ public void run() {
+ GroupAndInvokeCount thisGroupAndInvokeCount =
+ myGroupAndInvokeCount.get();
+ if (thisGroupAndInvokeCount != null)
+ thisGroupAndInvokeCount.setInvokeCount(1);
+ invokeUnchecked(handler, attachment, result, exc);
+ }
+ });
+ } catch (RejectedExecutionException ree) {
+ throw new ShutdownChannelGroupException();
}
}
@@ -216,19 +215,19 @@
* Invokes the handler "indirectly" in the given Executor
*/
static <V,A> void invokeIndirectly(final CompletionHandler<V,? super A> handler,
- final AbstractFuture<V,A> result,
+ final A attachment,
+ final V value,
+ final Throwable exc,
Executor executor)
{
- if (handler != null) {
- try {
- executor.execute(new Runnable() {
- public void run() {
- invokeUnchecked(handler, result);
- }
- });
- } catch (RejectedExecutionException ree) {
- throw new ShutdownChannelGroupException();
- }
+ try {
+ executor.execute(new Runnable() {
+ public void run() {
+ invokeUnchecked(handler, attachment, value, exc);
+ }
+ });
+ } catch (RejectedExecutionException ree) {
+ throw new ShutdownChannelGroupException();
}
}
@@ -258,4 +257,52 @@
throw new ShutdownChannelGroupException();
}
}
+
+ /**
+ * Invoke handler with completed result. This method does not check the
+ * thread identity or the number of handlers on the thread stack.
+ */
+ static <V,A> void invokeUnchecked(PendingFuture<V,A> future) {
+ assert future.isDone();
+ CompletionHandler<V,? super A> handler = future.handler();
+ if (handler != null) {
+ invokeUnchecked(handler,
+ future.attachment(),
+ future.value(),
+ future.exception());
+ }
+ }
+
+ /**
+ * Invoke handler with completed result. If the current thread is in the
+ * channel group's thread pool then the handler is invoked directly,
+ * otherwise it is invoked indirectly.
+ */
+ static <V,A> void invoke(PendingFuture<V,A> future) {
+ assert future.isDone();
+ CompletionHandler<V,? super A> handler = future.handler();
+ if (handler != null) {
+ invoke(future.channel(),
+ handler,
+ future.attachment(),
+ future.value(),
+ future.exception());
+ }
+ }
+
+ /**
+ * Invoke handler with completed result. The handler is invoked indirectly,
+ * via the channel group's thread pool.
+ */
+ static <V,A> void invokeIndirectly(PendingFuture<V,A> future) {
+ assert future.isDone();
+ CompletionHandler<V,? super A> handler = future.handler();
+ if (handler != null) {
+ invokeIndirectly(future.channel(),
+ handler,
+ future.attachment(),
+ future.value(),
+ future.exception());
+ }
+ }
}
--- a/jdk/src/share/classes/sun/nio/ch/PendingFuture.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/src/share/classes/sun/nio/ch/PendingFuture.java Sun Aug 23 12:53:45 2009 +0100
@@ -34,13 +34,13 @@
* attachment of an additional arbitrary context object and a timer task.
*/
-final class PendingFuture<V,A>
- extends AbstractFuture<V,A>
-{
+final class PendingFuture<V,A> implements Future<V> {
private static final CancellationException CANCELLED =
new CancellationException();
+ private final AsynchronousChannel channel;
private final CompletionHandler<V,? super A> handler;
+ private final A attachment;
// true if result (or exception) is available
private volatile boolean haveResult;
@@ -56,14 +56,14 @@
// optional context object
private volatile Object context;
-
PendingFuture(AsynchronousChannel channel,
CompletionHandler<V,? super A> handler,
A attachment,
Object context)
{
- super(channel, attachment);
+ this.channel = channel;
this.handler = handler;
+ this.attachment = attachment;
this.context = context;
}
@@ -71,14 +71,31 @@
CompletionHandler<V,? super A> handler,
A attachment)
{
- super(channel, attachment);
+ this.channel = channel;
this.handler = handler;
+ this.attachment = attachment;
+ }
+
+ PendingFuture(AsynchronousChannel channel) {
+ this(channel, null, null);
+ }
+
+ PendingFuture(AsynchronousChannel channel, Object context) {
+ this(channel, null, null, context);
+ }
+
+ AsynchronousChannel channel() {
+ return channel;
}
CompletionHandler<V,? super A> handler() {
return handler;
}
+ A attachment() {
+ return attachment;
+ }
+
void setContext(Object context) {
this.context = context;
}
@@ -113,36 +130,45 @@
/**
* Sets the result, or a no-op if the result or exception is already set.
*/
- boolean setResult(V res) {
+ void setResult(V res) {
synchronized (this) {
if (haveResult)
- return false;
+ return;
result = res;
haveResult = true;
if (timeoutTask != null)
timeoutTask.cancel(false);
if (latch != null)
latch.countDown();
- return true;
}
}
/**
* Sets the result, or a no-op if the result or exception is already set.
*/
- boolean setFailure(Throwable x) {
+ void setFailure(Throwable x) {
if (!(x instanceof IOException) && !(x instanceof SecurityException))
x = new IOException(x);
synchronized (this) {
if (haveResult)
- return false;
+ return;
exc = x;
haveResult = true;
if (timeoutTask != null)
timeoutTask.cancel(false);
if (latch != null)
latch.countDown();
- return true;
+ }
+ }
+
+ /**
+ * Sets the result
+ */
+ void setResult(V res, Throwable x) {
+ if (x == null) {
+ setResult(res);
+ } else {
+ setFailure(x);
}
}
@@ -178,12 +204,10 @@
return result;
}
- @Override
Throwable exception() {
return (exc != CANCELLED) ? exc : null;
}
- @Override
V value() {
return result;
}
@@ -204,33 +228,6 @@
if (haveResult)
return false; // already completed
- // A shutdown of the channel group will close all channels and
- // shutdown the executor. To ensure that the completion handler
- // is executed we queue the task while holding the lock.
- if (handler != null) {
- prepareForWait();
- Runnable cancelTask = new Runnable() {
- public void run() {
- while (!haveResult) {
- try {
- latch.await();
- } catch (InterruptedException ignore) { }
- }
- handler.cancelled(attachment());
- }
- };
- AsynchronousChannel ch = channel();
- if (ch instanceof Groupable) {
- ((Groupable)ch).group().executeOnPooledThread(cancelTask);
- } else {
- if (ch instanceof AsynchronousFileChannelImpl) {
- ((AsynchronousFileChannelImpl)ch).executor().execute(cancelTask);
- } else {
- throw new AssertionError("Should not get here");
- }
- }
- }
-
// notify channel
if (channel() instanceof Cancellable)
((Cancellable)channel()).onCancel(this);
@@ -249,7 +246,7 @@
} catch (IOException ignore) { }
}
- // release waiters (this also releases the invoker)
+ // release waiters
if (latch != null)
latch.countDown();
return true;
--- a/jdk/src/share/classes/sun/nio/ch/SimpleAsynchronousDatagramChannelImpl.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/src/share/classes/sun/nio/ch/SimpleAsynchronousDatagramChannelImpl.java Sun Aug 23 12:53:45 2009 +0100
@@ -317,51 +317,71 @@
return new WrappedMembershipKey(this, key);
}
- @Override
- public <A> Future<Integer> send(ByteBuffer src,
- SocketAddress target,
- long timeout,
- TimeUnit unit,
- A attachment,
- CompletionHandler<Integer,? super A> handler)
+ private <A> Future<Integer> implSend(ByteBuffer src,
+ SocketAddress target,
+ A attachment,
+ CompletionHandler<Integer,? super A> handler)
{
- if (timeout < 0L)
- throw new IllegalArgumentException("Negative timeout");
- if (unit == null)
- throw new NullPointerException();
-
- CompletedFuture<Integer,A> result;
+ int n = 0;
+ Throwable exc = null;
try {
- int n = dc.send(src, target);
- result = CompletedFuture.withResult(this, n, attachment);
+ n = dc.send(src, target);
} catch (IOException ioe) {
- result = CompletedFuture.withFailure(this, ioe, attachment);
+ exc = ioe;
}
- Invoker.invoke(handler, result);
- return result;
+ if (handler == null)
+ return CompletedFuture.withResult(n, exc);
+ Invoker.invoke(this, handler, attachment, n, exc);
+ return null;
+ }
+
+ @Override
+ public Future<Integer> send(ByteBuffer src, SocketAddress target) {
+ return implSend(src, target, null, null);
}
@Override
- public <A> Future<Integer> write(ByteBuffer src,
- long timeout,
- TimeUnit unit,
- A attachment,
- CompletionHandler<Integer,? super A> handler)
+ public <A> void send(ByteBuffer src,
+ SocketAddress target,
+ A attachment,
+ CompletionHandler<Integer,? super A> handler)
+ {
+ if (handler == null)
+ throw new NullPointerException("'handler' is null");
+ implSend(src, target, attachment, handler);
+ }
+
+ private <A> Future<Integer> implWrite(ByteBuffer src,
+ A attachment,
+ CompletionHandler<Integer,? super A> handler)
{
- if (timeout < 0L)
- throw new IllegalArgumentException("Negative timeout");
- if (unit == null)
- throw new NullPointerException();
-
- CompletedFuture<Integer,A> result;
+ int n = 0;
+ Throwable exc = null;
try {
- int n = dc.write(src);
- result = CompletedFuture.withResult(this, n, attachment);
+ n = dc.write(src);
} catch (IOException ioe) {
- result = CompletedFuture.withFailure(this, ioe, attachment);
+ exc = ioe;
}
- Invoker.invoke(handler, result);
- return result;
+ if (handler == null)
+ return CompletedFuture.withResult(n, exc);
+ Invoker.invoke(this, handler, attachment, n, exc);
+ return null;
+
+ }
+
+ @Override
+ public Future<Integer> write(ByteBuffer src) {
+ return implWrite(src, null, null);
+ }
+
+ @Override
+ public <A> void write(ByteBuffer src,
+ A attachment,
+ CompletionHandler<Integer,? super A> handler)
+ {
+ if (handler == null)
+ throw new NullPointerException("'handler' is null");
+ implWrite(src, attachment, handler);
}
/**
@@ -390,12 +410,11 @@
}
}
- @Override
- public <A> Future<SocketAddress> receive(final ByteBuffer dst,
- final long timeout,
- final TimeUnit unit,
- A attachment,
- final CompletionHandler<SocketAddress,? super A> handler)
+ private <A> Future<SocketAddress> implReceive(final ByteBuffer dst,
+ final long timeout,
+ final TimeUnit unit,
+ A attachment,
+ final CompletionHandler<SocketAddress,? super A> handler)
{
if (dst.isReadOnly())
throw new IllegalArgumentException("Read-only buffer");
@@ -406,10 +425,11 @@
// complete immediately if channel closed
if (!isOpen()) {
- CompletedFuture<SocketAddress,A> result = CompletedFuture.withFailure(this,
- new ClosedChannelException(), attachment);
- Invoker.invoke(handler, result);
- return result;
+ Throwable exc = new ClosedChannelException();
+ if (handler == null)
+ return CompletedFuture.withFailure(exc);
+ Invoker.invoke(this, handler, attachment, null, exc);
+ return null;
}
final AccessControlContext acc = (System.getSecurityManager() == null) ?
@@ -471,7 +491,7 @@
x = new AsynchronousCloseException();
result.setFailure(x);
}
- Invoker.invokeUnchecked(handler, result);
+ Invoker.invokeUnchecked(result);
}
};
try {
@@ -483,11 +503,27 @@
}
@Override
- public <A> Future<Integer> read(final ByteBuffer dst,
- final long timeout,
- final TimeUnit unit,
- A attachment,
- final CompletionHandler<Integer,? super A> handler)
+ public Future<SocketAddress> receive(ByteBuffer dst) {
+ return implReceive(dst, 0L, TimeUnit.MILLISECONDS, null, null);
+ }
+
+ @Override
+ public <A> void receive(ByteBuffer dst,
+ long timeout,
+ TimeUnit unit,
+ A attachment,
+ CompletionHandler<SocketAddress,? super A> handler)
+ {
+ if (handler == null)
+ throw new NullPointerException("'handler' is null");
+ implReceive(dst, timeout, unit, attachment, handler);
+ }
+
+ private <A> Future<Integer> implRead(final ByteBuffer dst,
+ final long timeout,
+ final TimeUnit unit,
+ A attachment,
+ final CompletionHandler<Integer,? super A> handler)
{
if (dst.isReadOnly())
throw new IllegalArgumentException("Read-only buffer");
@@ -495,18 +531,20 @@
throw new IllegalArgumentException("Negative timeout");
if (unit == null)
throw new NullPointerException();
- // another thread may disconnect before read is initiated
- if (!dc.isConnected())
- throw new NotYetConnectedException();
// complete immediately if channel closed
if (!isOpen()) {
- CompletedFuture<Integer,A> result = CompletedFuture.withFailure(this,
- new ClosedChannelException(), attachment);
- Invoker.invoke(handler, result);
- return result;
+ Throwable exc = new ClosedChannelException();
+ if (handler == null)
+ return CompletedFuture.withFailure(exc);
+ Invoker.invoke(this, handler, attachment, null, exc);
+ return null;
}
+ // another thread may disconnect before read is initiated
+ if (!dc.isConnected())
+ throw new NotYetConnectedException();
+
final PendingFuture<Integer,A> result =
new PendingFuture<Integer,A>(this, handler, attachment);
Runnable task = new Runnable() {
@@ -563,7 +601,7 @@
x = new AsynchronousCloseException();
result.setFailure(x);
}
- Invoker.invokeUnchecked(handler, result);
+ Invoker.invokeUnchecked(result);
}
};
try {
@@ -575,6 +613,23 @@
}
@Override
+ public Future<Integer> read(ByteBuffer dst) {
+ return implRead(dst, 0L, TimeUnit.MILLISECONDS, null, null);
+ }
+
+ @Override
+ public <A> void read(ByteBuffer dst,
+ long timeout,
+ TimeUnit unit,
+ A attachment,
+ CompletionHandler<Integer,? super A> handler)
+ {
+ if (handler == null)
+ throw new NullPointerException("'handler' is null");
+ implRead(dst, timeout, unit, attachment, handler);
+ }
+
+ @Override
public AsynchronousDatagramChannel bind(SocketAddress local)
throws IOException
{
--- a/jdk/src/share/classes/sun/nio/ch/SimpleAsynchronousFileChannelImpl.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/src/share/classes/sun/nio/ch/SimpleAsynchronousFileChannelImpl.java Sun Aug 23 12:53:45 2009 +0100
@@ -50,9 +50,6 @@
// Used to make native read and write calls
private static final FileDispatcher nd = new FileDispatcherImpl();
- // indicates if the associated thread pool is the default thread pool
- private final boolean isDefaultExecutor;
-
// Thread-safe set of IDs of native threads, for signalling
private final NativeThreadSet threads = new NativeThreadSet(2);
@@ -60,11 +57,9 @@
SimpleAsynchronousFileChannelImpl(FileDescriptor fdObj,
boolean reading,
boolean writing,
- ExecutorService executor,
- boolean isDefaultexecutor)
+ ExecutorService executor)
{
super(fdObj, reading, writing, executor);
- this.isDefaultExecutor = isDefaultexecutor;
}
public static AsynchronousFileChannel open(FileDescriptor fdo,
@@ -73,17 +68,9 @@
ThreadPool pool)
{
// Executor is either default or based on pool parameters
- ExecutorService executor;
- boolean isDefaultexecutor;
- if (pool == null) {
- executor = DefaultExecutorHolder.defaultExecutor;
- isDefaultexecutor = true;
- } else {
- executor = pool.executor();
- isDefaultexecutor = false;
- }
- return new SimpleAsynchronousFileChannelImpl(fdo,
- reading, writing, executor, isDefaultexecutor);
+ ExecutorService executor = (pool == null) ?
+ DefaultExecutorHolder.defaultExecutor : pool.executor();
+ return new SimpleAsynchronousFileChannelImpl(fdo, reading, writing, executor);
}
@Override
@@ -114,16 +101,6 @@
// close file
nd.close(fdObj);
-
- // shutdown executor if specific to this channel
- if (!isDefaultExecutor) {
- AccessController.doPrivileged(new PrivilegedAction<Void>() {
- public Void run() {
- executor.shutdown();
- return null;
- }
- });
- }
}
@Override
@@ -194,11 +171,11 @@
}
@Override
- public <A> Future<FileLock> lock(final long position,
- final long size,
- final boolean shared,
- A attachment,
- final CompletionHandler<FileLock,? super A> handler)
+ <A> Future<FileLock> implLock(final long position,
+ final long size,
+ final boolean shared,
+ final A attachment,
+ final CompletionHandler<FileLock,? super A> handler)
{
if (shared && !reading)
throw new NonReadableChannelException();
@@ -208,16 +185,19 @@
// add to lock table
final FileLockImpl fli = addToFileLockTable(position, size, shared);
if (fli == null) {
- CompletedFuture<FileLock,A> result = CompletedFuture
- .withFailure(this, new ClosedChannelException(), attachment);
- Invoker.invokeIndirectly(handler, result, executor);
- return result;
+ Throwable exc = new ClosedChannelException();
+ if (handler == null)
+ return CompletedFuture.withFailure(exc);
+ Invoker.invokeIndirectly(handler, attachment, null, exc, executor);
+ return null;
}
- final PendingFuture<FileLock,A> result =
- new PendingFuture<FileLock,A>(this, handler, attachment);
+ final PendingFuture<FileLock,A> result = (handler == null) ?
+ new PendingFuture<FileLock,A>(this) : null;
Runnable task = new Runnable() {
public void run() {
+ Throwable exc = null;
+
int ti = threads.add();
try {
int n;
@@ -226,31 +206,36 @@
do {
n = nd.lock(fdObj, true, position, size, shared);
} while ((n == FileDispatcher.INTERRUPTED) && isOpen());
- if (n == FileDispatcher.LOCKED && isOpen()) {
- result.setResult(fli);
- } else {
+ if (n != FileDispatcher.LOCKED || !isOpen()) {
throw new AsynchronousCloseException();
}
} catch (IOException x) {
removeFromFileLockTable(fli);
if (!isOpen())
x = new AsynchronousCloseException();
- result.setFailure(x);
+ exc = x;
} finally {
end();
}
} finally {
threads.remove(ti);
}
- Invoker.invokeUnchecked(handler, result);
+ if (handler == null) {
+ result.setResult(fli, exc);
+ } else {
+ Invoker.invokeUnchecked(handler, attachment, fli, exc);
+ }
}
};
+ boolean executed = false;
try {
executor.execute(task);
- } catch (RejectedExecutionException ree) {
- // rollback
- removeFromFileLockTable(fli);
- throw new ShutdownChannelGroupException();
+ executed = true;
+ } finally {
+ if (!executed) {
+ // rollback
+ removeFromFileLockTable(fli);
+ }
}
return result;
}
@@ -301,10 +286,10 @@
}
@Override
- public <A> Future<Integer> read(final ByteBuffer dst,
- final long position,
- A attachment,
- final CompletionHandler<Integer,? super A> handler)
+ <A> Future<Integer> implRead(final ByteBuffer dst,
+ final long position,
+ final A attachment,
+ final CompletionHandler<Integer,? super A> handler)
{
if (position < 0)
throw new IllegalArgumentException("Negative position");
@@ -315,55 +300,52 @@
// complete immediately if channel closed or no space remaining
if (!isOpen() || (dst.remaining() == 0)) {
- CompletedFuture<Integer,A> result;
- if (isOpen()) {
- result = CompletedFuture.withResult(this, 0, attachment);
- } else {
- result = CompletedFuture.withFailure(this,
- new ClosedChannelException(), attachment);
- }
- Invoker.invokeIndirectly(handler, result, executor);
- return result;
+ Throwable exc = (isOpen()) ? null : new ClosedChannelException();
+ if (handler == null)
+ return CompletedFuture.withResult(0, exc);
+ Invoker.invokeIndirectly(handler, attachment, 0, exc, executor);
+ return null;
}
- final PendingFuture<Integer,A> result =
- new PendingFuture<Integer,A>(this, handler, attachment);
+ final PendingFuture<Integer,A> result = (handler == null) ?
+ new PendingFuture<Integer,A>(this) : null;
Runnable task = new Runnable() {
public void run() {
+ int n = 0;
+ Throwable exc = null;
+
int ti = threads.add();
try {
begin();
- int n;
do {
n = IOUtil.read(fdObj, dst, position, nd, null);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
if (n < 0 && !isOpen())
throw new AsynchronousCloseException();
- result.setResult(n);
} catch (IOException x) {
if (!isOpen())
x = new AsynchronousCloseException();
- result.setFailure(x);
+ exc = x;
} finally {
end();
threads.remove(ti);
}
- Invoker.invokeUnchecked(handler, result);
+ if (handler == null) {
+ result.setResult(n, exc);
+ } else {
+ Invoker.invokeUnchecked(handler, attachment, n, exc);
+ }
}
};
- try {
- executor.execute(task);
- } catch (RejectedExecutionException ree) {
- throw new ShutdownChannelGroupException();
- }
+ executor.execute(task);
return result;
}
@Override
- public <A> Future<Integer> write(final ByteBuffer src,
- final long position,
- A attachment,
- final CompletionHandler<Integer,? super A> handler)
+ <A> Future<Integer> implWrite(final ByteBuffer src,
+ final long position,
+ final A attachment,
+ final CompletionHandler<Integer,? super A> handler)
{
if (position < 0)
throw new IllegalArgumentException("Negative position");
@@ -372,47 +354,44 @@
// complete immediately if channel is closed or no bytes remaining
if (!isOpen() || (src.remaining() == 0)) {
- CompletedFuture<Integer,A> result;
- if (isOpen()) {
- result = CompletedFuture.withResult(this, 0, attachment);
- } else {
- result = CompletedFuture.withFailure(this,
- new ClosedChannelException(), attachment);
- }
- Invoker.invokeIndirectly(handler, result, executor);
- return result;
+ Throwable exc = (isOpen()) ? null : new ClosedChannelException();
+ if (handler == null)
+ return CompletedFuture.withResult(0, exc);
+ Invoker.invokeIndirectly(handler, attachment, 0, exc, executor);
+ return null;
}
- final PendingFuture<Integer,A> result =
- new PendingFuture<Integer,A>(this, handler, attachment);
+ final PendingFuture<Integer,A> result = (handler == null) ?
+ new PendingFuture<Integer,A>(this) : null;
Runnable task = new Runnable() {
public void run() {
+ int n = 0;
+ Throwable exc = null;
+
int ti = threads.add();
try {
begin();
- int n;
do {
n = IOUtil.write(fdObj, src, position, nd, null);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
if (n < 0 && !isOpen())
throw new AsynchronousCloseException();
- result.setResult(n);
} catch (IOException x) {
if (!isOpen())
x = new AsynchronousCloseException();
- result.setFailure(x);
+ exc = x;
} finally {
end();
threads.remove(ti);
}
- Invoker.invokeUnchecked(handler, result);
+ if (handler == null) {
+ result.setResult(n, exc);
+ } else {
+ Invoker.invokeUnchecked(handler, attachment, n, exc);
+ }
}
};
- try {
- executor.execute(task);
- } catch (RejectedExecutionException ree) {
- throw new ShutdownChannelGroupException();
- }
+ executor.execute(task);
return result;
}
}
--- a/jdk/src/solaris/classes/sun/nio/ch/EPollPort.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/src/solaris/classes/sun/nio/ch/EPollPort.java Sun Aug 23 12:53:45 2009 +0100
@@ -248,12 +248,13 @@
public void run() {
Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
Invoker.getGroupAndInvokeCount();
+ final boolean isPooledThread = (myGroupAndInvokeCount != null);
boolean replaceMe = false;
Event ev;
try {
for (;;) {
// reset invoke count
- if (myGroupAndInvokeCount != null)
+ if (isPooledThread)
myGroupAndInvokeCount.resetInvokeCount();
try {
@@ -289,7 +290,7 @@
// process event
try {
- ev.channel().onEvent(ev.events());
+ ev.channel().onEvent(ev.events(), isPooledThread);
} catch (Error x) {
replaceMe = true; throw x;
} catch (RuntimeException x) {
--- a/jdk/src/solaris/classes/sun/nio/ch/Port.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/src/solaris/classes/sun/nio/ch/Port.java Sun Aug 23 12:53:45 2009 +0100
@@ -49,7 +49,7 @@
* Implemented by clients registered with this port.
*/
interface PollableChannel extends Closeable {
- void onEvent(int events);
+ void onEvent(int events, boolean mayInvokeDirect);
}
// maps fd to "pollable" channel
@@ -121,7 +121,7 @@
final Object attachForeignChannel(final Channel channel, FileDescriptor fd) {
int fdVal = IOUtil.fdVal(fd);
register(fdVal, new PollableChannel() {
- public void onEvent(int events) { }
+ public void onEvent(int events, boolean mayInvokeDirect) { }
public void close() throws IOException {
channel.close();
}
--- a/jdk/src/solaris/classes/sun/nio/ch/SolarisEventPort.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/src/solaris/classes/sun/nio/ch/SolarisEventPort.java Sun Aug 23 12:53:45 2009 +0100
@@ -151,12 +151,13 @@
public void run() {
Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
Invoker.getGroupAndInvokeCount();
+ final boolean isPooledThread = (myGroupAndInvokeCount != null);
boolean replaceMe = false;
long address = unsafe.allocateMemory(SIZEOF_PORT_EVENT);
try {
for (;;) {
// reset invoke count
- if (myGroupAndInvokeCount != null)
+ if (isPooledThread)
myGroupAndInvokeCount.resetInvokeCount();
// wait for I/O completion event
@@ -205,7 +206,7 @@
if (ch != null) {
replaceMe = true;
// no need to translate events
- ch.onEvent(events);
+ ch.onEvent(events, isPooledThread);
}
}
} finally {
--- a/jdk/src/solaris/classes/sun/nio/ch/UnixAsynchronousServerSocketChannelImpl.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/src/solaris/classes/sun/nio/ch/UnixAsynchronousServerSocketChannelImpl.java Sun Aug 23 12:53:45 2009 +0100
@@ -59,10 +59,13 @@
private final Object updateLock = new Object();
// pending accept
- private PendingFuture<AsynchronousSocketChannel,Object> pendingAccept;
+ private boolean acceptPending;
+ private CompletionHandler<AsynchronousSocketChannel,Object> acceptHandler;
+ private Object acceptAttachment;
+ private PendingFuture<AsynchronousSocketChannel,Object> acceptFuture;
// context for permission check when security manager set
- private AccessControlContext acc;
+ private AccessControlContext acceptAcc;
UnixAsynchronousServerSocketChannelImpl(Port port)
@@ -83,15 +86,6 @@
port.register(fdVal, this);
}
- // returns and clears the result of a pending accept
- private PendingFuture<AsynchronousSocketChannel,Object> grabPendingAccept() {
- synchronized (updateLock) {
- PendingFuture<AsynchronousSocketChannel,Object> result = pendingAccept;
- pendingAccept = null;
- return result;
- }
- }
-
@Override
void implClose() throws IOException {
// remove the mapping
@@ -101,17 +95,27 @@
nd.close(fd);
// if there is a pending accept then complete it
- final PendingFuture<AsynchronousSocketChannel,Object> result =
- grabPendingAccept();
- if (result != null) {
- // discard the stack trace as otherwise it may appear that implClose
- // has thrown the exception.
- AsynchronousCloseException x = new AsynchronousCloseException();
- x.setStackTrace(new StackTraceElement[0]);
- result.setFailure(x);
+ CompletionHandler<AsynchronousSocketChannel,Object> handler;
+ Object att;
+ PendingFuture<AsynchronousSocketChannel,Object> future;
+ synchronized (updateLock) {
+ if (!acceptPending)
+ return; // no pending accept
+ acceptPending = false;
+ handler = acceptHandler;
+ att = acceptAttachment;
+ future = acceptFuture;
+ }
+ // discard the stack trace as otherwise it may appear that implClose
+ // has thrown the exception.
+ AsynchronousCloseException x = new AsynchronousCloseException();
+ x.setStackTrace(new StackTraceElement[0]);
+ if (handler == null) {
+ future.setFailure(x);
+ } else {
// invoke by submitting task rather than directly
- Invoker.invokeIndirectly(result.handler(), result);
+ Invoker.invokeIndirectly(this, handler, att, null, x);
}
}
@@ -124,15 +128,17 @@
* Invoked by event handling thread when listener socket is polled
*/
@Override
- public void onEvent(int events) {
- PendingFuture<AsynchronousSocketChannel,Object> result = grabPendingAccept();
- if (result == null)
- return; // may have been grabbed by asynchronous close
+ public void onEvent(int events, boolean mayInvokeDirect) {
+ synchronized (updateLock) {
+ if (!acceptPending)
+ return; // may have been grabbed by asynchronous close
+ acceptPending = false;
+ }
// attempt to accept connection
FileDescriptor newfd = new FileDescriptor();
InetSocketAddress[] isaa = new InetSocketAddress[1];
- boolean accepted = false;
+ Throwable exc = null;
try {
begin();
int n = accept0(this.fd, newfd, isaa);
@@ -140,49 +146,52 @@
// spurious wakeup, is this possible?
if (n == IOStatus.UNAVAILABLE) {
synchronized (updateLock) {
- this.pendingAccept = result;
+ acceptPending = true;
}
port.startPoll(fdVal, Port.POLLIN);
return;
}
- // connection accepted
- accepted = true;
-
} catch (Throwable x) {
if (x instanceof ClosedChannelException)
x = new AsynchronousCloseException();
- enableAccept();
- result.setFailure(x);
+ exc = x;
} finally {
end();
}
// Connection accepted so finish it when not holding locks.
AsynchronousSocketChannel child = null;
- if (accepted) {
+ if (exc == null) {
try {
- child = finishAccept(newfd, isaa[0], acc);
- enableAccept();
- result.setResult(child);
+ child = finishAccept(newfd, isaa[0], acceptAcc);
} catch (Throwable x) {
- enableAccept();
if (!(x instanceof IOException) && !(x instanceof SecurityException))
x = new IOException(x);
- result.setFailure(x);
+ exc = x;
}
}
- // if an async cancel has already cancelled the operation then
- // close the new channel so as to free resources
- if (child != null && result.isCancelled()) {
- try {
- child.close();
- } catch (IOException ignore) { }
+ // copy field befores accept is re-renabled
+ CompletionHandler<AsynchronousSocketChannel,Object> handler = acceptHandler;
+ Object att = acceptAttachment;
+ PendingFuture<AsynchronousSocketChannel,Object> future = acceptFuture;
+
+ // re-enable accepting and invoke handler
+ enableAccept();
+
+ if (handler == null) {
+ future.setResult(child, exc);
+ // if an async cancel has already cancelled the operation then
+ // close the new channel so as to free resources
+ if (child != null && future.isCancelled()) {
+ try {
+ child.close();
+ } catch (IOException ignore) { }
+ }
+ } else {
+ Invoker.invoke(this, handler, att, child, exc);
}
-
- // invoke the handler
- Invoker.invoke(result.handler(), result);
}
/**
@@ -234,16 +243,18 @@
}
@Override
- @SuppressWarnings("unchecked")
- public <A> Future<AsynchronousSocketChannel> accept(A attachment,
- final CompletionHandler<AsynchronousSocketChannel,? super A> handler)
+ Future<AsynchronousSocketChannel> implAccept(Object att,
+ CompletionHandler<AsynchronousSocketChannel,Object> handler)
{
// complete immediately if channel is closed
if (!isOpen()) {
- CompletedFuture<AsynchronousSocketChannel,A> result = CompletedFuture
- .withFailure(this, new ClosedChannelException(), attachment);
- Invoker.invokeIndirectly(handler, result);
- return result;
+ Throwable e = new ClosedChannelException();
+ if (handler == null) {
+ return CompletedFuture.withFailure(e);
+ } else {
+ Invoker.invoke(this, handler, att, null, e);
+ return null;
+ }
}
if (localAddress == null)
throw new NotYetBoundException();
@@ -258,25 +269,31 @@
throw new AcceptPendingException();
// attempt accept
- AbstractFuture<AsynchronousSocketChannel,A> result = null;
FileDescriptor newfd = new FileDescriptor();
InetSocketAddress[] isaa = new InetSocketAddress[1];
+ Throwable exc = null;
try {
begin();
int n = accept0(this.fd, newfd, isaa);
if (n == IOStatus.UNAVAILABLE) {
- // no connection to accept
- result = new PendingFuture<AsynchronousSocketChannel,A>(this, handler, attachment);
// need calling context when there is security manager as
// permission check may be done in a different thread without
// any application call frames on the stack
- synchronized (this) {
- this.acc = (System.getSecurityManager() == null) ?
+ PendingFuture<AsynchronousSocketChannel,Object> result = null;
+ synchronized (updateLock) {
+ if (handler == null) {
+ this.acceptHandler = null;
+ result = new PendingFuture<AsynchronousSocketChannel,Object>(this);
+ this.acceptFuture = result;
+ } else {
+ this.acceptHandler = handler;
+ this.acceptAttachment = att;
+ }
+ this.acceptAcc = (System.getSecurityManager() == null) ?
null : AccessController.getContext();
- this.pendingAccept =
- (PendingFuture<AsynchronousSocketChannel,Object>)result;
+ this.acceptPending = true;
}
// register for connections
@@ -287,25 +304,30 @@
// accept failed
if (x instanceof ClosedChannelException)
x = new AsynchronousCloseException();
- result = CompletedFuture.withFailure(this, x, attachment);
+ exc = x;
} finally {
end();
}
- // connection accepted immediately
- if (result == null) {
+ AsynchronousSocketChannel child = null;
+ if (exc == null) {
+ // connection accepted immediately
try {
- AsynchronousSocketChannel ch = finishAccept(newfd, isaa[0], null);
- result = CompletedFuture.withResult(this, ch, attachment);
+ child = finishAccept(newfd, isaa[0], null);
} catch (Throwable x) {
- result = CompletedFuture.withFailure(this, x, attachment);
+ exc = x;
}
}
- // re-enable accepting and invoke handler
+ // re-enable accepting before invoking handler
enableAccept();
- Invoker.invokeIndirectly(handler, result);
- return result;
+
+ if (handler == null) {
+ return CompletedFuture.withResult(child, exc);
+ } else {
+ Invoker.invokeIndirectly(this, handler, att, child, exc);
+ return null;
+ }
}
// -- Native methods --
--- a/jdk/src/solaris/classes/sun/nio/ch/UnixAsynchronousSocketChannelImpl.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/src/solaris/classes/sun/nio/ch/UnixAsynchronousSocketChannelImpl.java Sun Aug 23 12:53:45 2009 +0100
@@ -61,20 +61,33 @@
private final Object updateLock = new Object();
// pending connect (updateLock)
- private PendingFuture<Void,Object> pendingConnect;
+ private boolean connectPending;
+ private CompletionHandler<Void,Object> connectHandler;
+ private Object connectAttachment;
+ private PendingFuture<Void,Object> connectFuture;
- // pending remote address (statLock)
+ // pending remote address (stateLock)
private SocketAddress pendingRemote;
// pending read (updateLock)
+ private boolean readPending;
+ private boolean isScatteringRead;
+ private ByteBuffer readBuffer;
private ByteBuffer[] readBuffers;
- private boolean scatteringRead;
- private PendingFuture<Number,Object> pendingRead;
+ private CompletionHandler<Number,Object> readHandler;
+ private Object readAttachment;
+ private PendingFuture<Number,Object> readFuture;
+ private Future<?> readTimer;
// pending write (updateLock)
+ private boolean writePending;
+ private boolean isGatheringWrite;
+ private ByteBuffer writeBuffer;
private ByteBuffer[] writeBuffers;
- private boolean gatheringWrite;
- private PendingFuture<Number,Object> pendingWrite;
+ private CompletionHandler<Number,Object> writeHandler;
+ private Object writeAttachment;
+ private PendingFuture<Number,Object> writeFuture;
+ private Future<?> writeTimer;
UnixAsynchronousSocketChannelImpl(Port port)
@@ -128,43 +141,36 @@
private void updateEvents() {
assert Thread.holdsLock(updateLock);
int events = 0;
- if (pendingRead != null)
+ if (readPending)
events |= Port.POLLIN;
- if (pendingConnect != null || pendingWrite != null)
+ if (connectPending || writePending)
events |= Port.POLLOUT;
if (events != 0)
port.startPoll(fdVal, events);
}
- /**
- * Invoked by event handler thread when file descriptor is polled
- */
- @Override
- public void onEvent(int events) {
- boolean readable = (events & Port.POLLIN) > 0;
- boolean writable = (events & Port.POLLOUT) > 0;
- if ((events & (Port.POLLERR | Port.POLLHUP)) > 0) {
- readable = true;
- writable = true;
- }
-
- PendingFuture<Void,Object> connectResult = null;
- PendingFuture<Number,Object> readResult = null;
- PendingFuture<Number,Object> writeResult = null;
+ // invoke to finish read and/or write operations
+ private void finish(boolean mayInvokeDirect,
+ boolean readable,
+ boolean writable)
+ {
+ boolean finishRead = false;
+ boolean finishWrite = false;
+ boolean finishConnect = false;
// map event to pending result
synchronized (updateLock) {
- if (readable && (pendingRead != null)) {
- readResult = pendingRead;
- pendingRead = null;
+ if (readable && this.readPending) {
+ this.readPending = false;
+ finishRead = true;
}
if (writable) {
- if (pendingWrite != null) {
- writeResult = pendingWrite;
- pendingWrite = null;
- } else if (pendingConnect != null) {
- connectResult = pendingConnect;
- pendingConnect = null;
+ if (this.writePending) {
+ this.writePending = false;
+ finishWrite = true;
+ } else if (this.connectPending) {
+ this.connectPending = false;
+ finishConnect = true;
}
}
}
@@ -172,36 +178,32 @@
// complete the I/O operation. Special case for when channel is
// ready for both reading and writing. In that case, submit task to
// complete write if write operation has a completion handler.
- if (readResult != null) {
- if (writeResult != null)
- finishWrite(writeResult, false);
- finishRead(readResult, true);
+ if (finishRead) {
+ if (finishWrite)
+ finishWrite(false);
+ finishRead(mayInvokeDirect);
return;
}
- if (writeResult != null) {
- finishWrite(writeResult, true);
+ if (finishWrite) {
+ finishWrite(mayInvokeDirect);
}
- if (connectResult != null) {
- finishConnect(connectResult, true);
+ if (finishConnect) {
+ finishConnect(mayInvokeDirect);
}
}
- // returns and clears the result of a pending read
- PendingFuture<Number,Object> grabPendingRead() {
- synchronized (updateLock) {
- PendingFuture<Number,Object> result = pendingRead;
- pendingRead = null;
- return result;
+ /**
+ * Invoked by event handler thread when file descriptor is polled
+ */
+ @Override
+ public void onEvent(int events, boolean mayInvokeDirect) {
+ boolean readable = (events & Port.POLLIN) > 0;
+ boolean writable = (events & Port.POLLOUT) > 0;
+ if ((events & (Port.POLLERR | Port.POLLHUP)) > 0) {
+ readable = true;
+ writable = true;
}
- }
-
- // returns and clears the result of a pending write
- PendingFuture<Number,Object> grabPendingWrite() {
- synchronized (updateLock) {
- PendingFuture<Number,Object> result = pendingWrite;
- pendingWrite = null;
- return result;
- }
+ finish(mayInvokeDirect, readable, writable);
}
@Override
@@ -213,26 +215,7 @@
nd.close(fd);
// All outstanding I/O operations are required to fail
- final PendingFuture<Void,Object> readyToConnect;
- final PendingFuture<Number,Object> readyToRead;
- final PendingFuture<Number,Object> readyToWrite;
- synchronized (updateLock) {
- readyToConnect = pendingConnect;
- pendingConnect = null;
- readyToRead = pendingRead;
- pendingRead = null;
- readyToWrite = pendingWrite;
- pendingWrite = null;
- }
- if (readyToConnect != null) {
- finishConnect(readyToConnect, false);
- }
- if (readyToRead != null) {
- finishRead(readyToRead, false);
- }
- if (readyToWrite != null) {
- finishWrite(readyToWrite, false);
- }
+ finish(false, true, true);
}
@Override
@@ -240,9 +223,9 @@
if (task.getContext() == OpType.CONNECT)
killConnect();
if (task.getContext() == OpType.READ)
- killConnect();
+ killReading();
if (task.getContext() == OpType.WRITE)
- killConnect();
+ killWriting();
}
// -- connect --
@@ -255,15 +238,12 @@
}
}
- private void finishConnect(PendingFuture<Void,Object> result,
- boolean invokeDirect)
- {
+ private void finishConnect(boolean mayInvokeDirect) {
Throwable e = null;
try {
begin();
checkConnect(fdVal);
setConnected();
- result.setResult(null);
} catch (Throwable x) {
if (x instanceof ClosedChannelException)
x = new AsynchronousCloseException();
@@ -276,26 +256,38 @@
try {
close();
} catch (IOException ignore) { }
- result.setFailure(e);
}
- if (invokeDirect) {
- Invoker.invoke(result.handler(), result);
+
+
+ // invoke handler and set result
+ CompletionHandler<Void,Object> handler = connectHandler;
+ Object att = connectAttachment;
+ PendingFuture<Void,Object> future = connectFuture;
+ if (handler == null) {
+ future.setResult(null, e);
} else {
- Invoker.invokeIndirectly(result.handler(), result);
+ if (mayInvokeDirect) {
+ Invoker.invokeUnchecked(handler, att, null, e);
+ } else {
+ Invoker.invokeIndirectly(this, handler, att, null, e);
+ }
}
}
@Override
@SuppressWarnings("unchecked")
- public <A> Future<Void> connect(SocketAddress remote,
- A attachment,
- CompletionHandler<Void,? super A> handler)
+ <A> Future<Void> implConnect(SocketAddress remote,
+ A attachment,
+ CompletionHandler<Void,? super A> handler)
{
if (!isOpen()) {
- CompletedFuture<Void,A> result = CompletedFuture
- .withFailure(this, new ClosedChannelException(), attachment);
- Invoker.invoke(handler, result);
- return result;
+ Throwable e = new ClosedChannelException();
+ if (handler == null) {
+ return CompletedFuture.withFailure(e);
+ } else {
+ Invoker.invoke(this, handler, attachment, null, e);
+ return null;
+ }
}
InetSocketAddress isa = Net.checkAddress(remote);
@@ -317,7 +309,6 @@
notifyBeforeTcpConnect = (localAddress == null);
}
- AbstractFuture<Void,A> result = null;
Throwable e = null;
try {
begin();
@@ -327,15 +318,21 @@
int n = Net.connect(fd, isa.getAddress(), isa.getPort());
if (n == IOStatus.UNAVAILABLE) {
// connection could not be established immediately
- result = new PendingFuture<Void,A>(this, handler, attachment, OpType.CONNECT);
+ PendingFuture<Void,A> result = null;
synchronized (updateLock) {
- this.pendingConnect = (PendingFuture<Void,Object>)result;
+ if (handler == null) {
+ result = new PendingFuture<Void,A>(this, OpType.CONNECT);
+ this.connectFuture = (PendingFuture<Void,Object>)result;
+ } else {
+ this.connectHandler = (CompletionHandler<Void,Object>)handler;
+ this.connectAttachment = attachment;
+ }
+ this.connectPending = true;
updateEvents();
}
return result;
}
setConnected();
- result = CompletedFuture.withResult(this, null, attachment);
} catch (Throwable x) {
if (x instanceof ClosedChannelException)
x = new AsynchronousCloseException();
@@ -349,84 +346,111 @@
try {
close();
} catch (IOException ignore) { }
- result = CompletedFuture.withFailure(this, e, attachment);
}
-
- Invoker.invoke(handler, result);
- return result;
+ if (handler == null) {
+ return CompletedFuture.withResult(null, e);
+ } else {
+ Invoker.invoke(this, handler, attachment, null, e);
+ return null;
+ }
}
// -- read --
- @SuppressWarnings("unchecked")
- private void finishRead(PendingFuture<Number,Object> result,
- boolean invokeDirect)
- {
+ private void finishRead(boolean mayInvokeDirect) {
int n = -1;
- PendingFuture<Number,Object> pending = null;
+ Throwable exc = null;
+
+ // copy fields as we can't access them after reading is re-enabled.
+ boolean scattering = isScatteringRead;
+ CompletionHandler<Number,Object> handler = readHandler;
+ Object att = readAttachment;
+ PendingFuture<Number,Object> future = readFuture;
+ Future<?> timeout = readTimer;
+
try {
begin();
- ByteBuffer[] dsts = readBuffers;
- if (dsts.length == 1) {
- n = IOUtil.read(fd, dsts[0], -1, nd, null);
+ if (scattering) {
+ n = (int)IOUtil.read(fd, readBuffers, nd);
} else {
- n = (int)IOUtil.read(fd, dsts, nd);
+ n = IOUtil.read(fd, readBuffer, -1, nd, null);
}
if (n == IOStatus.UNAVAILABLE) {
// spurious wakeup, is this possible?
- pending = result;
+ synchronized (updateLock) {
+ readPending = true;
+ }
return;
}
- // allow buffer(s) to be GC'ed.
- readBuffers = null;
+ // allow objects to be GC'ed.
+ this.readBuffer = null;
+ this.readBuffers = null;
+ this.readAttachment = null;
// allow another read to be initiated
- boolean wasScatteringRead = scatteringRead;
enableReading();
- // result is Integer or Long
- if (wasScatteringRead) {
- result.setResult(Long.valueOf(n));
- } else {
- result.setResult(Integer.valueOf(n));
- }
-
} catch (Throwable x) {
enableReading();
if (x instanceof ClosedChannelException)
x = new AsynchronousCloseException();
- result.setFailure(x);
+ exc = x;
} finally {
// restart poll in case of concurrent write
synchronized (updateLock) {
- if (pending != null)
- this.pendingRead = pending;
updateEvents();
}
end();
}
- if (invokeDirect) {
- Invoker.invoke(result.handler(), result);
+ // cancel the associated timer
+ if (timeout != null)
+ timeout.cancel(false);
+
+ // create result
+ Number result = (exc != null) ? null : (scattering) ?
+ (Number)Long.valueOf(n) : (Number)Integer.valueOf(n);
+
+ // invoke handler or set result
+ if (handler == null) {
+ future.setResult(result, exc);
} else {
- Invoker.invokeIndirectly(result.handler(), result);
+ if (mayInvokeDirect) {
+ Invoker.invokeUnchecked(handler, att, result, exc);
+ } else {
+ Invoker.invokeIndirectly(this, handler, att, result, exc);
+ }
}
}
private Runnable readTimeoutTask = new Runnable() {
public void run() {
- PendingFuture<Number,Object> result = grabPendingRead();
- if (result == null)
- return; // already completed
+ CompletionHandler<Number,Object> handler = null;
+ Object att = null;
+ PendingFuture<Number,Object> future = null;
+
+ synchronized (updateLock) {
+ if (!readPending)
+ return;
+ readPending = false;
+ handler = readHandler;
+ att = readAttachment;
+ future = readFuture;
+ }
// kill further reading before releasing waiters
enableReading(true);
- // set completed and invoke handler
- result.setFailure(new InterruptedByTimeoutException());
- Invoker.invokeIndirectly(result.handler(), result);
+ // invoke handler or set result
+ Exception exc = new InterruptedByTimeoutException();
+ if (handler == null) {
+ future.setFailure(exc);
+ } else {
+ AsynchronousChannel ch = UnixAsynchronousSocketChannelImpl.this;
+ Invoker.invokeIndirectly(ch, handler, att, null, exc);
+ }
}
};
@@ -435,8 +459,9 @@
*/
@Override
@SuppressWarnings("unchecked")
- <V extends Number,A> Future<V> readImpl(ByteBuffer[] dsts,
- boolean isScatteringRead,
+ <V extends Number,A> Future<V> implRead(boolean isScatteringRead,
+ ByteBuffer dst,
+ ByteBuffer[] dsts,
long timeout,
TimeUnit unit,
A attachment,
@@ -450,144 +475,178 @@
boolean invokeDirect = false;
boolean attemptRead = false;
if (!disableSynchronousRead) {
- myGroupAndInvokeCount = Invoker.getGroupAndInvokeCount();
- invokeDirect = Invoker.mayInvokeDirect(myGroupAndInvokeCount, port);
- attemptRead = (handler == null) || invokeDirect ||
- !port.isFixedThreadPool(); // okay to attempt read with user thread pool
+ if (handler == null) {
+ attemptRead = true;
+ } else {
+ myGroupAndInvokeCount = Invoker.getGroupAndInvokeCount();
+ invokeDirect = Invoker.mayInvokeDirect(myGroupAndInvokeCount, port);
+ // okay to attempt read with user thread pool
+ attemptRead = invokeDirect || !port.isFixedThreadPool();
+ }
}
- AbstractFuture<V,A> result;
+ int n = IOStatus.UNAVAILABLE;
+ Throwable exc = null;
+ boolean pending = false;
+
try {
begin();
- int n;
if (attemptRead) {
if (isScatteringRead) {
n = (int)IOUtil.read(fd, dsts, nd);
} else {
- n = IOUtil.read(fd, dsts[0], -1, nd, null);
+ n = IOUtil.read(fd, dst, -1, nd, null);
}
- } else {
- n = IOStatus.UNAVAILABLE;
}
if (n == IOStatus.UNAVAILABLE) {
- result = new PendingFuture<V,A>(this, handler, attachment, OpType.READ);
-
- // update evetns so that read will complete asynchronously
+ PendingFuture<V,A> result = null;
synchronized (updateLock) {
+ this.isScatteringRead = isScatteringRead;
+ this.readBuffer = dst;
this.readBuffers = dsts;
- this.scatteringRead = isScatteringRead;
- this.pendingRead = (PendingFuture<Number,Object>)result;
+ if (handler == null) {
+ this.readHandler = null;
+ result = new PendingFuture<V,A>(this, OpType.READ);
+ this.readFuture = (PendingFuture<Number,Object>)result;
+ this.readAttachment = null;
+ } else {
+ this.readHandler = (CompletionHandler<Number,Object>)handler;
+ this.readAttachment = attachment;
+ this.readFuture = null;
+ }
+ if (timeout > 0L) {
+ this.readTimer = port.schedule(readTimeoutTask, timeout, unit);
+ }
+ this.readPending = true;
updateEvents();
}
-
- // schedule timeout
- if (timeout > 0L) {
- Future<?> timeoutTask =
- port.schedule(readTimeoutTask, timeout, unit);
- ((PendingFuture<V,A>)result).setTimeoutTask(timeoutTask);
- }
+ pending = true;
return result;
}
-
- // data available
- enableReading();
-
- // result type is Long or Integer
- if (isScatteringRead) {
- result = (CompletedFuture<V,A>)CompletedFuture
- .withResult(this, Long.valueOf(n), attachment);
- } else {
- result = (CompletedFuture<V,A>)CompletedFuture
- .withResult(this, Integer.valueOf(n), attachment);
- }
} catch (Throwable x) {
- enableReading();
if (x instanceof ClosedChannelException)
x = new AsynchronousCloseException();
- result = CompletedFuture.withFailure(this, x, attachment);
+ exc = x;
} finally {
+ if (!pending)
+ enableReading();
end();
}
- if (invokeDirect) {
- Invoker.invokeDirect(myGroupAndInvokeCount, handler, result);
+ Number result = (exc != null) ? null : (isScatteringRead) ?
+ (Number)Long.valueOf(n) : (Number)Integer.valueOf(n);
+
+ // read completed immediately
+ if (handler != null) {
+ if (invokeDirect) {
+ Invoker.invokeDirect(myGroupAndInvokeCount, handler, attachment, (V)result, exc);
+ } else {
+ Invoker.invokeIndirectly(this, handler, attachment, (V)result, exc);
+ }
+ return null;
} else {
- Invoker.invokeIndirectly(handler, result);
+ return CompletedFuture.withResult((V)result, exc);
}
- return result;
}
// -- write --
- private void finishWrite(PendingFuture<Number,Object> result,
- boolean invokeDirect)
- {
- PendingFuture<Number,Object> pending = null;
+ private void finishWrite(boolean mayInvokeDirect) {
+ int n = -1;
+ Throwable exc = null;
+
+ // copy fields as we can't access them after reading is re-enabled.
+ boolean gathering = this.isGatheringWrite;
+ CompletionHandler<Number,Object> handler = this.writeHandler;
+ Object att = this.writeAttachment;
+ PendingFuture<Number,Object> future = this.writeFuture;
+ Future<?> timer = this.writeTimer;
+
try {
begin();
- ByteBuffer[] srcs = writeBuffers;
- int n;
- if (srcs.length == 1) {
- n = IOUtil.write(fd, srcs[0], -1, nd, null);
+ if (gathering) {
+ n = (int)IOUtil.write(fd, writeBuffers, nd);
} else {
- n = (int)IOUtil.write(fd, srcs, nd);
+ n = IOUtil.write(fd, writeBuffer, -1, nd, null);
}
if (n == IOStatus.UNAVAILABLE) {
// spurious wakeup, is this possible?
- pending = result;
+ synchronized (updateLock) {
+ writePending = true;
+ }
return;
}
- // allow buffer(s) to be GC'ed.
- writeBuffers = null;
+ // allow objects to be GC'ed.
+ this.writeBuffer = null;
+ this.writeBuffers = null;
+ this.writeAttachment = null;
// allow another write to be initiated
- boolean wasGatheringWrite = gatheringWrite;
enableWriting();
- // result is a Long or Integer
- if (wasGatheringWrite) {
- result.setResult(Long.valueOf(n));
- } else {
- result.setResult(Integer.valueOf(n));
- }
-
} catch (Throwable x) {
enableWriting();
if (x instanceof ClosedChannelException)
x = new AsynchronousCloseException();
- result.setFailure(x);
+ exc = x;
} finally {
- // restart poll in case of concurrent read
- synchronized (this) {
- if (pending != null)
- this.pendingWrite = pending;
+ // restart poll in case of concurrent write
+ synchronized (updateLock) {
updateEvents();
}
end();
}
- if (invokeDirect) {
- Invoker.invoke(result.handler(), result);
+
+ // cancel the associated timer
+ if (timer != null)
+ timer.cancel(false);
+
+ // create result
+ Number result = (exc != null) ? null : (gathering) ?
+ (Number)Long.valueOf(n) : (Number)Integer.valueOf(n);
+
+ // invoke handler or set result
+ if (handler == null) {
+ future.setResult(result, exc);
} else {
- Invoker.invokeIndirectly(result.handler(), result);
+ if (mayInvokeDirect) {
+ Invoker.invokeUnchecked(handler, att, result, exc);
+ } else {
+ Invoker.invokeIndirectly(this, handler, att, result, exc);
+ }
}
}
private Runnable writeTimeoutTask = new Runnable() {
public void run() {
- PendingFuture<Number,Object> result = grabPendingWrite();
- if (result == null)
- return; // already completed
+ CompletionHandler<Number,Object> handler = null;
+ Object att = null;
+ PendingFuture<Number,Object> future = null;
+
+ synchronized (updateLock) {
+ if (!writePending)
+ return;
+ writePending = false;
+ handler = writeHandler;
+ att = writeAttachment;
+ future = writeFuture;
+ }
// kill further writing before releasing waiters
enableWriting(true);
- // set completed and invoke handler
- result.setFailure(new InterruptedByTimeoutException());
- Invoker.invokeIndirectly(result.handler(), result);
+ // invoke handler or set result
+ Exception exc = new InterruptedByTimeoutException();
+ if (handler != null) {
+ Invoker.invokeIndirectly(UnixAsynchronousSocketChannelImpl.this,
+ handler, att, null, exc);
+ } else {
+ future.setFailure(exc);
+ }
}
};
@@ -596,8 +655,9 @@
*/
@Override
@SuppressWarnings("unchecked")
- <V extends Number,A> Future<V> writeImpl(ByteBuffer[] srcs,
- boolean isGatheringWrite,
+ <V extends Number,A> Future<V> implWrite(boolean isGatheringWrite,
+ ByteBuffer src,
+ ByteBuffer[] srcs,
long timeout,
TimeUnit unit,
A attachment,
@@ -607,66 +667,72 @@
Invoker.getGroupAndInvokeCount();
boolean invokeDirect = Invoker.mayInvokeDirect(myGroupAndInvokeCount, port);
boolean attemptWrite = (handler == null) || invokeDirect ||
- !port.isFixedThreadPool(); // okay to attempt read with user thread pool
+ !port.isFixedThreadPool(); // okay to attempt write with user thread pool
- AbstractFuture<V,A> result;
+ int n = IOStatus.UNAVAILABLE;
+ Throwable exc = null;
+ boolean pending = false;
+
try {
begin();
- int n;
if (attemptWrite) {
if (isGatheringWrite) {
n = (int)IOUtil.write(fd, srcs, nd);
} else {
- n = IOUtil.write(fd, srcs[0], -1, nd, null);
+ n = IOUtil.write(fd, src, -1, nd, null);
}
- } else {
- n = IOStatus.UNAVAILABLE;
}
if (n == IOStatus.UNAVAILABLE) {
- result = new PendingFuture<V,A>(this, handler, attachment, OpType.WRITE);
-
- // update evetns so that read will complete asynchronously
+ PendingFuture<V,A> result = null;
synchronized (updateLock) {
+ this.isGatheringWrite = isGatheringWrite;
+ this.writeBuffer = src;
this.writeBuffers = srcs;
- this.gatheringWrite = isGatheringWrite;
- this.pendingWrite = (PendingFuture<Number,Object>)result;
+ if (handler == null) {
+ this.writeHandler = null;
+ result = new PendingFuture<V,A>(this, OpType.WRITE);
+ this.writeFuture = (PendingFuture<Number,Object>)result;
+ this.writeAttachment = null;
+ } else {
+ this.writeHandler = (CompletionHandler<Number,Object>)handler;
+ this.writeAttachment = attachment;
+ this.writeFuture = null;
+ }
+ if (timeout > 0L) {
+ this.writeTimer = port.schedule(writeTimeoutTask, timeout, unit);
+ }
+ this.writePending = true;
updateEvents();
}
-
- // schedule timeout
- if (timeout > 0L) {
- Future<?> timeoutTask =
- port.schedule(writeTimeoutTask, timeout, unit);
- ((PendingFuture<V,A>)result).setTimeoutTask(timeoutTask);
- }
+ pending = true;
return result;
}
-
- // data available
- enableWriting();
- if (isGatheringWrite) {
- result = (CompletedFuture<V,A>)CompletedFuture
- .withResult(this, Long.valueOf(n), attachment);
- } else {
- result = (CompletedFuture<V,A>)CompletedFuture
- .withResult(this, Integer.valueOf(n), attachment);
- }
} catch (Throwable x) {
- enableWriting();
if (x instanceof ClosedChannelException)
x = new AsynchronousCloseException();
- result = CompletedFuture.withFailure(this, x, attachment);
+ exc = x;
} finally {
+ if (!pending)
+ enableWriting();
end();
}
- if (invokeDirect) {
- Invoker.invokeDirect(myGroupAndInvokeCount, handler, result);
+
+ Number result = (exc != null) ? null : (isGatheringWrite) ?
+ (Number)Long.valueOf(n) : (Number)Integer.valueOf(n);
+
+ // write completed immediately
+ if (handler != null) {
+ if (invokeDirect) {
+ Invoker.invokeDirect(myGroupAndInvokeCount, handler, attachment, (V)result, exc);
+ } else {
+ Invoker.invokeIndirectly(this, handler, attachment, (V)result, exc);
+ }
+ return null;
} else {
- Invoker.invokeIndirectly(handler, result);
+ return CompletedFuture.withResult((V)result, exc);
}
- return result;
}
// -- Native methods --
--- a/jdk/src/windows/classes/sun/nio/ch/Iocp.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/src/windows/classes/sun/nio/ch/Iocp.java Sun Aug 23 12:53:45 2009 +0100
@@ -34,6 +34,8 @@
import java.util.concurrent.*;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.security.AccessController;
+import sun.security.action.GetPropertyAction;
import sun.misc.Unsafe;
/**
@@ -44,6 +46,7 @@
class Iocp extends AsynchronousChannelGroupImpl {
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long INVALID_HANDLE_VALUE = -1L;
+ private static final boolean supportsThreadAgnosticIo;
// maps completion key to channel
private final ReadWriteLock keyToChannelLock = new ReentrantReadWriteLock();
@@ -87,6 +90,13 @@
<V,A> PendingFuture<V,A> getByOverlapped(long overlapped);
}
+ /**
+ * Indicates if this operating system supports thread agnostic I/O.
+ */
+ static boolean supportsThreadAgnosticIo() {
+ return supportsThreadAgnosticIo;
+ }
+
// release all resources
void implClose() {
synchronized (this) {
@@ -216,8 +226,9 @@
} while ((key == 0) || keyToChannel.containsKey(key));
// associate with I/O completion port
- if (handle != 0L)
+ if (handle != 0L) {
createIoCompletionPort(handle, port, key, 0);
+ }
// setup mapping
keyToChannel.put(key, ch);
@@ -282,7 +293,7 @@
/**
* Invoked if the I/O operation completes successfully.
*/
- public void completed(int bytesTransferred);
+ public void completed(int bytesTransferred, boolean canInvokeDirect);
/**
* Invoked if the I/O operation fails.
@@ -305,6 +316,7 @@
public void run() {
Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
Invoker.getGroupAndInvokeCount();
+ boolean canInvokeDirect = (myGroupAndInvokeCount != null);
CompletionStatus ioResult = new CompletionStatus();
boolean replaceMe = false;
@@ -382,7 +394,7 @@
ResultHandler rh = (ResultHandler)result.getContext();
replaceMe = true; // (if error/exception then replace thread)
if (error == 0) {
- rh.completed(ioResult.bytesTransferred());
+ rh.completed(ioResult.bytesTransferred(), canInvokeDirect);
} else {
rh.failed(error, translateErrorToIOException(error));
}
@@ -433,5 +445,11 @@
static {
Util.load();
initIDs();
+
+ // thread agnostic I/O on Vista/2008 or newer
+ String osversion = AccessController.doPrivileged(
+ new GetPropertyAction("os.version"));
+ String vers[] = osversion.split("\\.");
+ supportsThreadAgnosticIo = Integer.parseInt(vers[0]) >= 6;
}
}
--- a/jdk/src/windows/classes/sun/nio/ch/WindowsAsynchronousFileChannelImpl.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/src/windows/classes/sun/nio/ch/WindowsAsynchronousFileChannelImpl.java Sun Aug 23 12:53:45 2009 +0100
@@ -146,10 +146,12 @@
// waits until all I/O operations have completed
ioCache.close();
- // disassociate from port and shutdown thread pool if not default
+ // disassociate from port
iocp.disassociate(completionKey);
+
+ // for the non-default group close the port
if (!isDefaultIocp)
- iocp.shutdown();
+ iocp.detachFromThreadPool();
}
@Override
@@ -258,14 +260,18 @@
}
// invoke completion handler
- Invoker.invoke(result.handler(), result);
+ Invoker.invoke(result);
}
@Override
- public void completed(int bytesTransferred) {
+ public void completed(int bytesTransferred, boolean canInvokeDirect) {
// release waiters and invoke completion handler
result.setResult(fli);
- Invoker.invoke(result.handler(), result);
+ if (canInvokeDirect) {
+ Invoker.invokeUnchecked(result);
+ } else {
+ Invoker.invoke(result);
+ }
}
@Override
@@ -279,16 +285,16 @@
} else {
result.setFailure(new AsynchronousCloseException());
}
- Invoker.invoke(result.handler(), result);
+ Invoker.invoke(result);
}
}
@Override
- public <A> Future<FileLock> lock(long position,
- long size,
- boolean shared,
- A attachment,
- CompletionHandler<FileLock,? super A> handler)
+ <A> Future<FileLock> implLock(final long position,
+ final long size,
+ final boolean shared,
+ A attachment,
+ final CompletionHandler<FileLock,? super A> handler)
{
if (shared && !reading)
throw new NonReadableChannelException();
@@ -298,10 +304,11 @@
// add to lock table
FileLockImpl fli = addToFileLockTable(position, size, shared);
if (fli == null) {
- CompletedFuture<FileLock,A> result = CompletedFuture
- .withFailure(this, new ClosedChannelException(), attachment);
- Invoker.invoke(handler, result);
- return result;
+ Throwable exc = new ClosedChannelException();
+ if (handler == null)
+ return CompletedFuture.withFailure(exc);
+ Invoker.invoke(this, handler, attachment, null, exc);
+ return null;
}
// create Future and task that will be invoked to acquire lock
@@ -310,13 +317,20 @@
LockTask lockTask = new LockTask<A>(position, fli, result);
result.setContext(lockTask);
- // initiate I/O (can only be done from thread in thread pool)
- try {
- Invoker.invokeOnThreadInThreadPool(this, lockTask);
- } catch (ShutdownChannelGroupException e) {
- // rollback
- removeFromFileLockTable(fli);
- throw e;
+ // initiate I/O
+ if (Iocp.supportsThreadAgnosticIo()) {
+ lockTask.run();
+ } else {
+ boolean executed = false;
+ try {
+ Invoker.invokeOnThreadInThreadPool(this, lockTask);
+ executed = true;
+ } finally {
+ if (!executed) {
+ // rollback
+ removeFromFileLockTable(fli);
+ }
+ }
}
return result;
}
@@ -461,14 +475,14 @@
releaseBufferIfSubstituted();
// invoke completion handler
- Invoker.invoke(result.handler(), result);
+ Invoker.invoke(result);
}
/**
* Executed when the I/O has completed
*/
@Override
- public void completed(int bytesTransferred) {
+ public void completed(int bytesTransferred, boolean canInvokeDirect) {
updatePosition(bytesTransferred);
// return direct buffer to cache if substituted
@@ -476,14 +490,18 @@
// release waiters and invoke completion handler
result.setResult(bytesTransferred);
- Invoker.invoke(result.handler(), result);
+ if (canInvokeDirect) {
+ Invoker.invokeUnchecked(result);
+ } else {
+ Invoker.invoke(result);
+ }
}
@Override
public void failed(int error, IOException x) {
// if EOF detected asynchronously then it is reported as error
if (error == ERROR_HANDLE_EOF) {
- completed(-1);
+ completed(-1, false);
} else {
// return direct buffer to cache if substituted
releaseBufferIfSubstituted();
@@ -494,16 +512,16 @@
} else {
result.setFailure(new AsynchronousCloseException());
}
- Invoker.invoke(result.handler(), result);
+ Invoker.invoke(result);
}
}
}
@Override
- public <A> Future<Integer> read(ByteBuffer dst,
- long position,
- A attachment,
- CompletionHandler<Integer,? super A> handler)
+ <A> Future<Integer> implRead(ByteBuffer dst,
+ long position,
+ A attachment,
+ CompletionHandler<Integer,? super A> handler)
{
if (!reading)
throw new NonReadableChannelException();
@@ -514,10 +532,11 @@
// check if channel is closed
if (!isOpen()) {
- CompletedFuture<Integer,A> result = CompletedFuture
- .withFailure(this, new ClosedChannelException(), attachment);
- Invoker.invoke(handler, result);
- return result;
+ Throwable exc = new ClosedChannelException();
+ if (handler == null)
+ return CompletedFuture.withFailure(exc);
+ Invoker.invoke(this, handler, attachment, null, exc);
+ return null;
}
int pos = dst.position();
@@ -527,10 +546,10 @@
// no space remaining
if (rem == 0) {
- CompletedFuture<Integer,A> result =
- CompletedFuture.withResult(this, 0, attachment);
- Invoker.invoke(handler, result);
- return result;
+ if (handler == null)
+ return CompletedFuture.withResult(0);
+ Invoker.invoke(this, handler, attachment, 0, null);
+ return null;
}
// create Future and task that initiates read
@@ -539,8 +558,12 @@
ReadTask readTask = new ReadTask<A>(dst, pos, rem, position, result);
result.setContext(readTask);
- // initiate I/O (can only be done from thread in thread pool)
- Invoker.invokeOnThreadInThreadPool(this, readTask);
+ // initiate I/O
+ if (Iocp.supportsThreadAgnosticIo()) {
+ readTask.run();
+ } else {
+ Invoker.invokeOnThreadInThreadPool(this, readTask);
+ }
return result;
}
@@ -639,14 +662,14 @@
}
// invoke completion handler
- Invoker.invoke(result.handler(), result);
+ Invoker.invoke(result);
}
/**
* Executed when the I/O has completed
*/
@Override
- public void completed(int bytesTransferred) {
+ public void completed(int bytesTransferred, boolean canInvokeDirect) {
updatePosition(bytesTransferred);
// return direct buffer to cache if substituted
@@ -654,7 +677,11 @@
// release waiters and invoke completion handler
result.setResult(bytesTransferred);
- Invoker.invoke(result.handler(), result);
+ if (canInvokeDirect) {
+ Invoker.invokeUnchecked(result);
+ } else {
+ Invoker.invoke(result);
+ }
}
@Override
@@ -668,15 +695,14 @@
} else {
result.setFailure(new AsynchronousCloseException());
}
- Invoker.invoke(result.handler(), result);
+ Invoker.invoke(result);
}
}
- @Override
- public <A> Future<Integer> write(ByteBuffer src,
- long position,
- A attachment,
- CompletionHandler<Integer,? super A> handler)
+ <A> Future<Integer> implWrite(ByteBuffer src,
+ long position,
+ A attachment,
+ CompletionHandler<Integer,? super A> handler)
{
if (!writing)
throw new NonWritableChannelException();
@@ -685,10 +711,11 @@
// check if channel is closed
if (!isOpen()) {
- CompletedFuture<Integer,A> result = CompletedFuture
- .withFailure(this, new ClosedChannelException(), attachment);
- Invoker.invoke(handler, result);
- return result;
+ Throwable exc = new ClosedChannelException();
+ if (handler == null)
+ return CompletedFuture.withFailure(exc);
+ Invoker.invoke(this, handler, attachment, null, exc);
+ return null;
}
int pos = src.position();
@@ -698,10 +725,10 @@
// nothing to write
if (rem == 0) {
- CompletedFuture<Integer,A> result =
- CompletedFuture.withResult(this, 0, attachment);
- Invoker.invoke(handler, result);
- return result;
+ if (handler == null)
+ return CompletedFuture.withResult(0);
+ Invoker.invoke(this, handler, attachment, 0, null);
+ return null;
}
// create Future and task to initiate write
@@ -710,8 +737,12 @@
WriteTask writeTask = new WriteTask<A>(src, pos, rem, position, result);
result.setContext(writeTask);
- // initiate I/O (can only be done from thread in thread pool)
- Invoker.invokeOnThreadInThreadPool(this, writeTask);
+ // initiate I/O
+ if (Iocp.supportsThreadAgnosticIo()) {
+ writeTask.run();
+ } else {
+ Invoker.invokeOnThreadInThreadPool(this, writeTask);
+ }
return result;
}
--- a/jdk/src/windows/classes/sun/nio/ch/WindowsAsynchronousServerSocketChannelImpl.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/src/windows/classes/sun/nio/ch/WindowsAsynchronousServerSocketChannelImpl.java Sun Aug 23 12:53:45 2009 +0100
@@ -113,14 +113,14 @@
/**
* Task to initiate accept operation and to handle result.
*/
- private class AcceptTask<A> implements Runnable, Iocp.ResultHandler {
+ private class AcceptTask implements Runnable, Iocp.ResultHandler {
private final WindowsAsynchronousSocketChannelImpl channel;
private final AccessControlContext acc;
- private final PendingFuture<AsynchronousSocketChannel,A> result;
+ private final PendingFuture<AsynchronousSocketChannel,Object> result;
AcceptTask(WindowsAsynchronousSocketChannelImpl channel,
AccessControlContext acc,
- PendingFuture<AsynchronousSocketChannel,A> result)
+ PendingFuture<AsynchronousSocketChannel,Object> result)
{
this.channel = channel;
this.acc = acc;
@@ -222,14 +222,14 @@
}
// invoke completion handler
- Invoker.invokeIndirectly(result.handler(), result);
+ Invoker.invokeIndirectly(result);
}
/**
* Executed when the I/O has completed
*/
@Override
- public void completed(int bytesTransferred) {
+ public void completed(int bytesTransferred, boolean canInvokeDirect) {
try {
// connection accept after group has shutdown
if (iocp.isShutdown()) {
@@ -269,7 +269,7 @@
}
// invoke handler (but not directly)
- Invoker.invokeIndirectly(result.handler(), result);
+ Invoker.invokeIndirectly(result);
}
@Override
@@ -283,19 +283,20 @@
} else {
result.setFailure(new AsynchronousCloseException());
}
- Invoker.invokeIndirectly(result.handler(), result);
+ Invoker.invokeIndirectly(result);
}
}
@Override
- public <A> Future<AsynchronousSocketChannel> accept(A attachment,
- final CompletionHandler<AsynchronousSocketChannel,? super A> handler)
+ Future<AsynchronousSocketChannel> implAccept(Object attachment,
+ final CompletionHandler<AsynchronousSocketChannel,Object> handler)
{
if (!isOpen()) {
- CompletedFuture<AsynchronousSocketChannel,A> result = CompletedFuture
- .withFailure(this, new ClosedChannelException(), attachment);
- Invoker.invokeIndirectly(handler, result);
- return result;
+ Throwable exc = new ClosedChannelException();
+ if (handler == null)
+ return CompletedFuture.withFailure(exc);
+ Invoker.invokeIndirectly(this, handler, attachment, null, exc);
+ return null;
}
if (isAcceptKilled())
throw new RuntimeException("Accept not allowed due to cancellation");
@@ -319,10 +320,10 @@
end();
}
if (ioe != null) {
- CompletedFuture<AsynchronousSocketChannel,A> result =
- CompletedFuture.withFailure(this, ioe, attachment);
- Invoker.invokeIndirectly(handler, result);
- return result;
+ if (handler == null)
+ return CompletedFuture.withFailure(ioe);
+ Invoker.invokeIndirectly(this, handler, attachment, null, ioe);
+ return null;
}
// need calling context when there is security manager as
@@ -331,20 +332,21 @@
AccessControlContext acc = (System.getSecurityManager() == null) ?
null : AccessController.getContext();
- PendingFuture<AsynchronousSocketChannel,A> result =
- new PendingFuture<AsynchronousSocketChannel,A>(this, handler, attachment);
- AcceptTask task = new AcceptTask<A>(ch, acc, result);
+ PendingFuture<AsynchronousSocketChannel,Object> result =
+ new PendingFuture<AsynchronousSocketChannel,Object>(this, handler, attachment);
+ AcceptTask task = new AcceptTask(ch, acc, result);
result.setContext(task);
// check and set flag to prevent concurrent accepting
if (!accepting.compareAndSet(false, true))
throw new AcceptPendingException();
- // initiate accept. As I/O operations are tied to the initiating thread
- // then it will only be invoked direcly if this thread is in the thread
- // pool. If this thread is not in the thread pool when a task is
- // submitted to initiate the accept.
- Invoker.invokeOnThreadInThreadPool(this, task);
+ // initiate I/O
+ if (Iocp.supportsThreadAgnosticIo()) {
+ task.run();
+ } else {
+ Invoker.invokeOnThreadInThreadPool(this, task);
+ }
return result;
}
--- a/jdk/src/windows/classes/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/src/windows/classes/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.java Sun Aug 23 12:53:45 2009 +0100
@@ -250,14 +250,14 @@
closeChannel();
result.setFailure(toIOException(exc));
}
- Invoker.invoke(result.handler(), result);
+ Invoker.invoke(result);
}
/**
* Invoked by handler thread when connection established.
*/
@Override
- public void completed(int bytesTransferred) {
+ public void completed(int bytesTransferred, boolean canInvokeDirect) {
Throwable exc = null;
try {
begin();
@@ -276,7 +276,11 @@
result.setFailure(toIOException(exc));
}
- Invoker.invoke(result.handler(), result);
+ if (canInvokeDirect) {
+ Invoker.invokeUnchecked(result);
+ } else {
+ Invoker.invoke(result);
+ }
}
/**
@@ -290,20 +294,21 @@
} else {
result.setFailure(new AsynchronousCloseException());
}
- Invoker.invoke(result.handler(), result);
+ Invoker.invoke(result);
}
}
@Override
- public <A> Future<Void> connect(SocketAddress remote,
- A attachment,
- CompletionHandler<Void,? super A> handler)
+ <A> Future<Void> implConnect(SocketAddress remote,
+ A attachment,
+ CompletionHandler<Void,? super A> handler)
{
if (!isOpen()) {
- CompletedFuture<Void,A> result = CompletedFuture
- .withFailure(this, new ClosedChannelException(), attachment);
- Invoker.invoke(handler, result);
- return result;
+ Throwable exc = new ClosedChannelException();
+ if (handler == null)
+ return CompletedFuture.withFailure(exc);
+ Invoker.invoke(this, handler, attachment, null, exc);
+ return null;
}
InetSocketAddress isa = Net.checkAddress(remote);
@@ -337,10 +342,10 @@
try {
close();
} catch (IOException ignore) { }
- CompletedFuture<Void,A> result = CompletedFuture
- .withFailure(this, bindException, attachment);
- Invoker.invoke(handler, result);
- return result;
+ if (handler == null)
+ return CompletedFuture.withFailure(bindException);
+ Invoker.invoke(this, handler, attachment, null, bindException);
+ return null;
}
// setup task
@@ -349,8 +354,12 @@
ConnectTask task = new ConnectTask<A>(isa, result);
result.setContext(task);
- // initiate I/O (can only be done from thread in thread pool)
- Invoker.invokeOnThreadInThreadPool(this, task);
+ // initiate I/O
+ if (Iocp.supportsThreadAgnosticIo()) {
+ task.run();
+ } else {
+ Invoker.invokeOnThreadInThreadPool(this, task);
+ }
return result;
}
@@ -514,7 +523,7 @@
}
// invoke completion handler
- Invoker.invoke(result.handler(), result);
+ Invoker.invoke(result);
}
/**
@@ -522,7 +531,7 @@
*/
@Override
@SuppressWarnings("unchecked")
- public void completed(int bytesTransferred) {
+ public void completed(int bytesTransferred, boolean canInvokeDirect) {
if (bytesTransferred == 0) {
bytesTransferred = -1; // EOF
} else {
@@ -543,7 +552,11 @@
result.setResult((V)Integer.valueOf(bytesTransferred));
}
}
- Invoker.invoke(result.handler(), result);
+ if (canInvokeDirect) {
+ Invoker.invokeUnchecked(result);
+ } else {
+ Invoker.invoke(result);
+ }
}
@Override
@@ -561,7 +574,7 @@
enableReading();
result.setFailure(x);
}
- Invoker.invoke(result.handler(), result);
+ Invoker.invoke(result);
}
/**
@@ -579,13 +592,14 @@
}
// invoke handler without any locks
- Invoker.invoke(result.handler(), result);
+ Invoker.invoke(result);
}
}
@Override
- <V extends Number,A> Future<V> readImpl(ByteBuffer[] bufs,
- boolean scatteringRead,
+ <V extends Number,A> Future<V> implRead(boolean isScatteringRead,
+ ByteBuffer dst,
+ ByteBuffer[] dsts,
long timeout,
TimeUnit unit,
A attachment,
@@ -594,7 +608,14 @@
// setup task
PendingFuture<V,A> result =
new PendingFuture<V,A>(this, handler, attachment);
- final ReadTask readTask = new ReadTask<V,A>(bufs, scatteringRead, result);
+ ByteBuffer[] bufs;
+ if (isScatteringRead) {
+ bufs = dsts;
+ } else {
+ bufs = new ByteBuffer[1];
+ bufs[0] = dst;
+ }
+ final ReadTask readTask = new ReadTask<V,A>(bufs, isScatteringRead, result);
result.setContext(readTask);
// schedule timeout
@@ -607,8 +628,12 @@
result.setTimeoutTask(timeoutTask);
}
- // initiate I/O (can only be done from thread in thread pool)
- Invoker.invokeOnThreadInThreadPool(this, readTask);
+ // initiate I/O
+ if (Iocp.supportsThreadAgnosticIo()) {
+ readTask.run();
+ } else {
+ Invoker.invokeOnThreadInThreadPool(this, readTask);
+ }
return result;
}
@@ -710,7 +735,7 @@
}
@Override
- @SuppressWarnings("unchecked")
+ //@SuppressWarnings("unchecked")
public void run() {
long overlapped = 0L;
boolean prepared = false;
@@ -759,7 +784,7 @@
}
// invoke completion handler
- Invoker.invoke(result.handler(), result);
+ Invoker.invoke(result);
}
/**
@@ -767,7 +792,7 @@
*/
@Override
@SuppressWarnings("unchecked")
- public void completed(int bytesTransferred) {
+ public void completed(int bytesTransferred, boolean canInvokeDirect) {
updateBuffers(bytesTransferred);
// return direct buffer to cache if substituted
@@ -784,7 +809,11 @@
result.setResult((V)Integer.valueOf(bytesTransferred));
}
}
- Invoker.invoke(result.handler(), result);
+ if (canInvokeDirect) {
+ Invoker.invokeUnchecked(result);
+ } else {
+ Invoker.invoke(result);
+ }
}
@Override
@@ -802,7 +831,7 @@
enableWriting();
result.setFailure(x);
}
- Invoker.invoke(result.handler(), result);
+ Invoker.invoke(result);
}
/**
@@ -820,13 +849,14 @@
}
// invoke handler without any locks
- Invoker.invoke(result.handler(), result);
+ Invoker.invoke(result);
}
}
@Override
- <V extends Number,A> Future<V> writeImpl(ByteBuffer[] bufs,
- boolean gatheringWrite,
+ <V extends Number,A> Future<V> implWrite(boolean gatheringWrite,
+ ByteBuffer src,
+ ByteBuffer[] srcs,
long timeout,
TimeUnit unit,
A attachment,
@@ -835,6 +865,13 @@
// setup task
PendingFuture<V,A> result =
new PendingFuture<V,A>(this, handler, attachment);
+ ByteBuffer[] bufs;
+ if (gatheringWrite) {
+ bufs = srcs;
+ } else {
+ bufs = new ByteBuffer[1];
+ bufs[0] = src;
+ }
final WriteTask writeTask = new WriteTask<V,A>(bufs, gatheringWrite, result);
result.setContext(writeTask);
@@ -849,7 +886,12 @@
}
// initiate I/O (can only be done from thread in thread pool)
- Invoker.invokeOnThreadInThreadPool(this, writeTask);
+ // initiate I/O
+ if (Iocp.supportsThreadAgnosticIo()) {
+ writeTask.run();
+ } else {
+ Invoker.invokeOnThreadInThreadPool(this, writeTask);
+ }
return result;
}
--- a/jdk/src/windows/native/sun/nio/ch/Iocp.c Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/src/windows/native/sun/nio/ch/Iocp.c Sun Aug 23 12:53:45 2009 +0100
@@ -58,6 +58,16 @@
completionStatus_overlapped = (*env)->GetFieldID(env, clazz, "overlapped", "J");
}
+JNIEXPORT jint JNICALL
+Java_sun_nio_ch_Iocp_osMajorVersion(JNIEnv* env, jclass this)
+{
+ OSVERSIONINFOEX ver;
+ ver.dwOSVersionInfoSize = sizeof(ver);
+ GetVersionEx((OSVERSIONINFO *) &ver);
+ return (ver.dwPlatformId == VER_PLATFORM_WIN32_NT) ?
+ (jint)(ver.dwMajorVersion) : (jint)0;
+}
+
JNIEXPORT jlong JNICALL
Java_sun_nio_ch_Iocp_createIoCompletionPort(JNIEnv* env, jclass this,
jlong handle, jlong existingPort, jint completionKey, jint concurrency)
--- a/jdk/test/java/nio/channels/AsynchronousChannelGroup/GroupOfOne.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/test/java/nio/channels/AsynchronousChannelGroup/GroupOfOne.java Sun Aug 23 12:53:45 2009 +0100
@@ -22,7 +22,7 @@
*/
/* @test
- * @bug 4607272
+ * @bug 4607272 6842687
* @summary Unit test for AsynchronousChannelGroup
*/
@@ -50,8 +50,6 @@
}
public void failed(Throwable exc, Void att) {
}
- public void cancelled(Void att) {
- }
});
int port = ((InetSocketAddress)(listener.getLocalAddress())).getPort();
@@ -97,9 +95,6 @@
System.out.println("Read failed (expected)");
latch.countDown();
}
- public void cancelled(Void att) {
- throw new RuntimeException();
- }
});
// close channel or shutdown group
@@ -122,9 +117,6 @@
public void failed(Throwable exc, Void att) {
throw new RuntimeException(exc);
}
- public void cancelled(Void att) {
- throw new RuntimeException();
- }
});
latch.await();
--- a/jdk/test/java/nio/channels/AsynchronousChannelGroup/Identity.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/test/java/nio/channels/AsynchronousChannelGroup/Identity.java Sun Aug 23 12:53:45 2009 +0100
@@ -22,7 +22,7 @@
*/
/* @test
- * @bug 4607272
+ * @bug 4607272 6842687
* @summary Unit test for AsynchronousChannelGroup
*/
@@ -90,14 +90,10 @@
}
public void failed(Throwable exc, Void att) {
}
- public void cancelled(Void att) {
- }
});
}
public void failed(Throwable exc, Void att) {
}
- public void cancelled(Void att) {
- }
});
int port = ((InetSocketAddress)(listener.getLocalAddress())).getPort();
SocketAddress sa = new InetSocketAddress(InetAddress.getLocalHost(), port);
@@ -141,9 +137,6 @@
public void failed(Throwable exc, Integer groupId) {
fail(exc.getMessage());
}
- public void cancelled(Integer groupId) {
- fail("I/O operation was cancelled");
- }
});
// wait until
--- a/jdk/test/java/nio/channels/AsynchronousChannelGroup/Restart.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/test/java/nio/channels/AsynchronousChannelGroup/Restart.java Sun Aug 23 12:53:45 2009 +0100
@@ -22,7 +22,7 @@
*/
/* @test
- * @bug 4607272
+ * @bug 4607272 6842687
* @summary Unit test for AsynchronousChannelGroup
* @build Restart
* @run main/othervm -XX:-UseVMInterruptibleIO Restart
@@ -111,8 +111,6 @@
}
public void failed(Throwable exc, Void att) {
}
- public void cancelled(Void att) {
- }
});
// establish loopback connection which should cause completion
--- a/jdk/test/java/nio/channels/AsynchronousChannelGroup/Unbounded.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/test/java/nio/channels/AsynchronousChannelGroup/Unbounded.java Sun Aug 23 12:53:45 2009 +0100
@@ -22,7 +22,7 @@
*/
/* @test
- * @bug 4607272
+ * @bug 4607272 6842687
* @summary Unit test for AsynchronousChannelGroup
*/
@@ -52,8 +52,6 @@
}
public void failed(Throwable exc, Void att) {
}
- public void cancelled(Void att) {
- }
});
System.out.println("Listener created.");
@@ -97,8 +95,6 @@
}
public void failed(Throwable exc, AsynchronousSocketChannel ch) {
}
- public void cancelled(AsynchronousSocketChannel ch) {
- }
});
}
System.out.println("All read operations outstanding.");
--- a/jdk/test/java/nio/channels/AsynchronousDatagramChannel/Basic.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/test/java/nio/channels/AsynchronousDatagramChannel/Basic.java Sun Aug 23 12:53:45 2009 +0100
@@ -22,7 +22,7 @@
*/
/* @test
- * @bug 4527345
+ * @bug 4527345 6842687
* @summary Unit test for AsynchronousDatagramChannel
*/
@@ -72,8 +72,6 @@
}
public void failed (Throwable exc, Void att) {
}
- public void cancelled(Void att) {
- }
});
Thread.sleep(2000);
sender.send(ByteBuffer.wrap(msg), sa);
@@ -88,8 +86,6 @@
public void failed (Throwable exc, Void att) {
exception.set(exc);
}
- public void cancelled(Void att) {
- }
});
Throwable result;
while ((result = exception.get()) == null) {
@@ -107,8 +103,6 @@
public void failed (Throwable exc, Void att) {
exception.set(exc);
}
- public void cancelled(Void att) {
- }
});
ch.close();
while ((result = exception.get()) == null) {
@@ -162,8 +156,6 @@
}
public void failed (Throwable exc, Void att) {
}
- public void cancelled(Void att) {
- }
});
Thread.sleep(2000);
sender.send(ByteBuffer.wrap(msg), sa);
@@ -178,8 +170,6 @@
public void failed (Throwable exc, Void att) {
exception.set(exc);
}
- public void cancelled(Void att) {
- }
});
Throwable result;
while ((result = exception.get()) == null) {
@@ -197,8 +187,6 @@
public void failed (Throwable exc, Void att) {
exception.set(exc);
}
- public void cancelled(Void att) {
- }
});
ch.close();
while ((result = exception.get()) == null) {
@@ -246,8 +234,6 @@
}
public void failed (Throwable exc, Void att) {
}
- public void cancelled(Void att) {
- }
});
l2.await(5, TimeUnit.SECONDS);
@@ -272,8 +258,6 @@
throw new RuntimeException(exc);
}
}
- public void cancelled(Void att) {
- }
});
l3.await(5, TimeUnit.SECONDS);
@@ -323,8 +307,6 @@
}
public void failed (Throwable exc, Void att) {
}
- public void cancelled(Void att) {
- }
});
l2.await(5, TimeUnit.SECONDS);
@@ -340,7 +322,7 @@
reader.close();
}
- static void cancelAndCheck(Future<?> result, CountDownLatch latch)
+ static void cancelAndCheck(Future<?> result)
throws InterruptedException
{
boolean cancelled = result.cancel(false);
@@ -356,37 +338,22 @@
} catch (ExecutionException e) {
throw new RuntimeException("Should not fail");
}
-
- // make sure that completion handler is invoked
- latch.await();
}
// basic cancel tests
static void doCancelTests() throws Exception {
InetAddress lh = InetAddress.getLocalHost();
- // timed and non-timed receive
+ // receive
for (int i=0; i<2; i++) {
AsynchronousDatagramChannel ch =
AsynchronousDatagramChannel.open().bind(new InetSocketAddress(0));
- final CountDownLatch latch = new CountDownLatch(1);
- long timeout = (i == 0) ? 0L : 60L;
- Future<SocketAddress> remote = ch
- .receive(ByteBuffer.allocate(100), timeout, TimeUnit.SECONDS, (Void)null,
- new CompletionHandler<SocketAddress,Void>() {
- public void completed(SocketAddress source, Void att) {
- }
- public void failed (Throwable exc, Void att) {
- }
- public void cancelled(Void att) {
- latch.countDown();
- }
- });
- cancelAndCheck(remote, latch);
+ Future<SocketAddress> remote = ch.receive(ByteBuffer.allocate(100));
+ cancelAndCheck(remote);
ch.close();
}
- // timed and non-timed read
+ // read
for (int i=0; i<2; i++) {
AsynchronousDatagramChannel ch =
AsynchronousDatagramChannel.open().bind(new InetSocketAddress(0));
@@ -394,18 +361,8 @@
((InetSocketAddress)(ch.getLocalAddress())).getPort()));
final CountDownLatch latch = new CountDownLatch(1);
long timeout = (i == 0) ? 0L : 60L;
- Future<Integer> result = ch
- .read(ByteBuffer.allocate(100), timeout, TimeUnit.SECONDS, (Void)null,
- new CompletionHandler<Integer,Void>() {
- public void completed(Integer bytesRead, Void att) {
- }
- public void failed (Throwable exc, Void att) {
- }
- public void cancelled(Void att) {
- latch.countDown();
- }
- });
- cancelAndCheck(result, latch);
+ Future<Integer> result = ch.read(ByteBuffer.allocate(100));
+ cancelAndCheck(result);
ch.close();
}
}
--- a/jdk/test/java/nio/channels/AsynchronousFileChannel/Basic.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/test/java/nio/channels/AsynchronousFileChannel/Basic.java Sun Aug 23 12:53:45 2009 +0100
@@ -22,7 +22,7 @@
*/
/* @test
- * @bug 4607272 6822643 6830721
+ * @bug 4607272 6822643 6830721 6842687
* @summary Unit test for AsynchronousFileChannel
*/
@@ -195,8 +195,6 @@
}
public void failed(Throwable exc, Void att) {
}
- public void cancelled(Void att) {
- }
});
throw new RuntimeException("OverlappingFileLockException expected");
} catch (OverlappingFileLockException x) {
@@ -229,8 +227,6 @@
}
public void failed(Throwable exc, Void att) {
}
- public void cancelled(Void att) {
- }
});
// wait for handler to complete
@@ -318,8 +314,6 @@
}
public void failed(Throwable exc, Void att) {
}
- public void cancelled(Void att) {
- }
});
await(latch);
@@ -338,8 +332,41 @@
}
} finally {
ch.close();
+ executor.shutdown();
}
}
+
+
+ // test sharing a thread pool between many channels
+ ExecutorService executor = Executors
+ .newFixedThreadPool(1+rand.nextInt(10), threadFactory);
+ final int n = 50 + rand.nextInt(50);
+ AsynchronousFileChannel[] channels = new AsynchronousFileChannel[n];
+ try {
+ for (int i=0; i<n; i++) {
+ Set<StandardOpenOption> opts = EnumSet.of(WRITE);
+ channels[i] = AsynchronousFileChannel.open(file, opts, executor);
+ final CountDownLatch latch = new CountDownLatch(1);
+ channels[i].write(genBuffer(), 0L, (Void)null, new CompletionHandler<Integer,Void>() {
+ public void completed(Integer result, Void att) {
+ latch.countDown();
+ }
+ public void failed(Throwable exc, Void att) {
+ }
+ });
+ await(latch);
+
+ // close ~half the channels
+ if (rand.nextBoolean())
+ channels[i].close();
+ }
+ } finally {
+ // close remaining channels
+ for (int i=0; i<n; i++) {
+ if (channels[i] != null) channels[i].close();
+ }
+ executor.shutdown();
+ }
}
// exercise asynchronous close
@@ -409,17 +436,7 @@
.open(file, WRITE, SYNC);
// start write operation
- final CountDownLatch latch = new CountDownLatch(1);
- Future<Integer> res = ch.write(genBuffer(), 0L, (Void)null,
- new CompletionHandler<Integer,Void>() {
- public void completed(Integer result, Void att) {
- }
- public void failed(Throwable exc, Void att) {
- }
- public void cancelled(Void att) {
- latch.countDown();
- }
- });
+ Future<Integer> res = ch.write(genBuffer(), 0L);
// cancel operation
boolean cancelled = res.cancel(mayInterruptIfRunning);
@@ -456,10 +473,6 @@
throw new RuntimeException(x);
}
- // check that cancelled method is invoked
- if (cancelled)
- await(latch);
-
ch.close();
}
}
@@ -547,8 +560,6 @@
}
public void failed(Throwable exc, Long position) {
}
- public void cancelled(Long position) {
- }
});
// wait for writes to complete
@@ -574,8 +585,6 @@
}
public void failed(Throwable exc, Long position) {
}
- public void cancelled(Long position) {
- }
});
// wait for reads to complete
--- a/jdk/test/java/nio/channels/AsynchronousFileChannel/CustomThreadPool.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/test/java/nio/channels/AsynchronousFileChannel/CustomThreadPool.java Sun Aug 23 12:53:45 2009 +0100
@@ -22,7 +22,7 @@
*/
/* @test
- * @bug 4607272
+ * @bug 4607272 6842687
* @summary Unit test for java.nio.channels.AsynchronousFileChannel
* @build CustomThreadPool MyThreadFactory
* @run main/othervm -Djava.nio.channels.DefaultThreadPool.threadFactory=MyThreadFactory CustomThreadPool
@@ -51,8 +51,6 @@
}
public void failed(Throwable exc, AtomicReference<Thread> invoker) {
}
- public void cancelled(AtomicReference<Thread> invoker) {
- }
});
Thread t;
while ((t = invoker.get()) == null) {
--- a/jdk/test/java/nio/channels/AsynchronousFileChannel/Lock.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/test/java/nio/channels/AsynchronousFileChannel/Lock.java Sun Aug 23 12:53:45 2009 +0100
@@ -23,7 +23,7 @@
/* @test
- * @bug 4607272 6814948
+ * @bug 4607272 6814948 6842687
* @summary Unit test for AsynchronousFileChannel#lock method
*/
@@ -97,7 +97,7 @@
slave.lock(0, 10, false);
// this VM acquires lock on non-overlapping range
- fl = ch.lock(10, 10, false, null, null).get();
+ fl = ch.lock(10, 10, false).get();
fl.release();
// done
--- a/jdk/test/java/nio/channels/AsynchronousServerSocketChannel/Basic.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/test/java/nio/channels/AsynchronousServerSocketChannel/Basic.java Sun Aug 23 12:53:45 2009 +0100
@@ -22,7 +22,7 @@
*/
/* @test
- * @bug 4607272
+ * @bug 4607272 6842687
* @summary Unit test for AsynchronousServerSocketChannel
* @run main/timeout=180 Basic
*/
@@ -104,8 +104,6 @@
public void failed(Throwable exc, Void att) {
exception.set(exc);
}
- public void cancelled(Void att) {
- }
});
// check AcceptPendingException
--- a/jdk/test/java/nio/channels/AsynchronousSocketChannel/Basic.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/test/java/nio/channels/AsynchronousSocketChannel/Basic.java Sun Aug 23 12:53:45 2009 +0100
@@ -22,7 +22,7 @@
*/
/* @test
- * @bug 4607272
+ * @bug 4607272 6842687
* @summary Unit test for AsynchronousSocketChannel
* @run main/timeout=600 Basic
*/
@@ -187,8 +187,6 @@
public void failed(Throwable exc, Void att) {
connectException.set(exc);
}
- public void cancelled(Void att) {
- }
});
while (connectException.get() == null) {
Thread.sleep(100);
@@ -289,8 +287,6 @@
public void failed(Throwable x, AsynchronousSocketChannel ch) {
writeException.set(x);
}
- public void cancelled(AsynchronousSocketChannel ch) {
- }
});
// give time for socket buffer to fill up.
@@ -330,18 +326,8 @@
SocketChannel peer = server.accept();
// start read operation
- final CountDownLatch latch = new CountDownLatch(1);
ByteBuffer buf = ByteBuffer.allocate(1);
- Future<Integer> res = ch.read(buf, (Void)null,
- new CompletionHandler<Integer,Void>() {
- public void completed(Integer result, Void att) {
- }
- public void failed(Throwable exc, Void att) {
- }
- public void cancelled(Void att) {
- latch.countDown();
- }
- });
+ Future<Integer> res = ch.read(buf);
// cancel operation
boolean cancelled = res.cancel(mayInterruptIfRunning);
@@ -362,8 +348,11 @@
} catch (CancellationException x) {
}
- // check that completion handler executed.
- latch.await();
+ // check that the cancel doesn't impact writing to the channel
+ if (!mayInterruptIfRunning) {
+ buf = ByteBuffer.wrap("a".getBytes());
+ ch.write(buf).get();
+ }
ch.close();
peer.close();
@@ -408,8 +397,6 @@
}
public void failed(Throwable exc, Void att) {
}
- public void cancelled(Void att) {
- }
});
latch.await();
@@ -460,8 +447,6 @@
}
public void failed(Throwable exc, Void att) {
}
- public void cancelled(Void att) {
- }
});
// trickle the writing
@@ -507,26 +492,24 @@
}
// scattering read that completes ascynhronously
- final CountDownLatch latch = new CountDownLatch(1);
+ final CountDownLatch l1 = new CountDownLatch(1);
ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null,
new CompletionHandler<Long,Void>() {
public void completed(Long result, Void att) {
long n = result;
if (n <= 0)
throw new RuntimeException("No bytes read");
- latch.countDown();
+ l1.countDown();
}
public void failed(Throwable exc, Void att) {
}
- public void cancelled(Void att) {
- }
});
// write some bytes
sc.write(genBuffer());
// read should now complete
- latch.await();
+ l1.await();
// write more bytes
sc.write(genBuffer());
@@ -535,10 +518,20 @@
for (int i=0; i<dsts.length; i++) {
dsts[i].rewind();
}
- long n = ch
- .read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null, null).get();
- if (n <= 0)
- throw new RuntimeException("No bytes read");
+
+ final CountDownLatch l2 = new CountDownLatch(1);
+ ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null,
+ new CompletionHandler<Long,Void>() {
+ public void completed(Long result, Void att) {
+ long n = result;
+ if (n <= 0)
+ throw new RuntimeException("No bytes read");
+ l2.countDown();
+ }
+ public void failed(Throwable exc, Void att) {
+ }
+ });
+ l2.await();
ch.close();
sc.close();
@@ -574,8 +567,6 @@
}
public void failed(Throwable exc, Void att) {
}
- public void cancelled(Void att) {
- }
});
// read to EOF or buffer full
@@ -613,19 +604,29 @@
ch.connect(server.address()).get();
SocketChannel sc = server.accept();
+ // number of bytes written
+ final AtomicLong bytesWritten = new AtomicLong(0);
+
// write buffers (should complete immediately)
ByteBuffer[] srcs = genBuffers(1);
- long n = ch
- .write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null, null).get();
- if (n <= 0)
- throw new RuntimeException("No bytes written");
+ final CountDownLatch l1 = new CountDownLatch(1);
+ ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null,
+ new CompletionHandler<Long,Void>() {
+ public void completed(Long result, Void att) {
+ long n = result;
+ if (n <= 0)
+ throw new RuntimeException("No bytes read");
+ bytesWritten.addAndGet(n);
+ l1.countDown();
+ }
+ public void failed(Throwable exc, Void att) {
+ }
+ });
+ l1.await();
// set to true to signal that no more buffers should be written
final AtomicBoolean continueWriting = new AtomicBoolean(true);
- // number of bytes written
- final AtomicLong bytesWritten = new AtomicLong(n);
-
// write until socket buffer is full so as to create the conditions
// for when a write does not complete immediately
srcs = genBuffers(1);
@@ -644,8 +645,6 @@
}
public void failed(Throwable exc, Void att) {
}
- public void cancelled(Void att) {
- }
});
// give time for socket buffer to fill up.
@@ -658,7 +657,7 @@
ByteBuffer buf = ByteBuffer.allocateDirect(4096);
long total = 0L;
do {
- n = sc.read(buf);
+ int n = sc.read(buf);
if (n <= 0)
throw new RuntimeException("No bytes read");
buf.rewind();
@@ -714,15 +713,27 @@
System.out.println("-- timeout when reading --");
+ ByteBuffer dst = ByteBuffer.allocate(512);
+
+ final AtomicReference<Throwable> readException = new AtomicReference<Throwable>();
+
// this read should timeout
- ByteBuffer dst = ByteBuffer.allocate(512);
- try {
- ch.read(dst, 3, TimeUnit.SECONDS, (Void)null, null).get();
- throw new RuntimeException("Read did not timeout");
- } catch (ExecutionException x) {
- if (!(x.getCause() instanceof InterruptedByTimeoutException))
- throw new RuntimeException("InterruptedByTimeoutException expected");
+ ch.read(dst, 3, TimeUnit.SECONDS, (Void)null,
+ new CompletionHandler<Integer,Void>()
+ {
+ public void completed(Integer result, Void att) {
+ throw new RuntimeException("Should not complete");
+ }
+ public void failed(Throwable exc, Void att) {
+ readException.set(exc);
+ }
+ });
+ // wait for exception
+ while (readException.get() == null) {
+ Thread.sleep(100);
}
+ if (!(readException.get() instanceof InterruptedByTimeoutException))
+ throw new RuntimeException("InterruptedByTimeoutException expected");
// after a timeout then further reading should throw unspecified runtime exception
boolean exceptionThrown = false;
@@ -752,8 +763,6 @@
public void failed(Throwable exc, AsynchronousSocketChannel ch) {
writeException.set(exc);
}
- public void cancelled(AsynchronousSocketChannel ch) {
- }
});
// wait for exception
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/nio/channels/AsynchronousSocketChannel/DieBeforeComplete.java Sun Aug 23 12:53:45 2009 +0100
@@ -0,0 +1,136 @@
+/*
+ * Copyright 2008-2009 Sun Microsystems, Inc. All Rights Reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ */
+
+/* @test
+ * @bug 6842687
+ * @summary Unit test for AsynchronousSocketChannel/AsynchronousServerSocketChannel
+ */
+import java.nio.ByteBuffer;
+import java.nio.channels.*;
+import java.net.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Initiates I/O operation on a thread that terminates before the I/O completes.
+ */
+
+public class DieBeforeComplete {
+
+ public static void main(String[] args) throws Exception {
+ final AsynchronousServerSocketChannel listener =
+ AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(0));
+
+ InetAddress lh = InetAddress.getLocalHost();
+ int port = ((InetSocketAddress) (listener.getLocalAddress())).getPort();
+ final SocketAddress sa = new InetSocketAddress(lh, port);
+
+ // -- accept --
+
+ // initiate accept in a thread that dies before connection is established
+ Future<AsynchronousSocketChannel> r1 =
+ initiateAndDie(new Task<AsynchronousSocketChannel>() {
+ public Future<AsynchronousSocketChannel> run() {
+ return listener.accept();
+ }});
+
+ // establish and accept connection
+ SocketChannel peer = SocketChannel.open(sa);
+ final AsynchronousSocketChannel channel = r1.get();
+
+ // --- read --
+
+ // initiate read in a thread that dies befores bytes are available
+ final ByteBuffer dst = ByteBuffer.allocate(100);
+ Future<Integer> r2 = initiateAndDie(new Task<Integer>() {
+ public Future<Integer> run() {
+ return channel.read(dst);
+ }});
+
+ // send bytes
+ peer.write(ByteBuffer.wrap("hello".getBytes()));
+ int nread = r2.get();
+ if (nread <= 0)
+ throw new RuntimeException("Should have read at least one byte");
+
+ // -- write --
+
+ // initiate writes in threads that dies
+ boolean completedImmediately;
+ Future<Integer> r3;
+ do {
+ final ByteBuffer src = ByteBuffer.wrap(new byte[10000]);
+ r3 = initiateAndDie(new Task<Integer>() {
+ public Future<Integer> run() {
+ return channel.write(src);
+ }});
+ try {
+ int nsent = r3.get(5, TimeUnit.SECONDS);
+ if (nsent <= 0)
+ throw new RuntimeException("Should have wrote at least one byte");
+ completedImmediately = true;
+ } catch (TimeoutException x) {
+ completedImmediately = false;
+ }
+ } while (completedImmediately);
+
+ // drain connection
+ peer.configureBlocking(false);
+ ByteBuffer src = ByteBuffer.allocateDirect(10000);
+ do {
+ src.clear();
+ nread = peer.read(src);
+ if (nread == 0) {
+ Thread.sleep(100);
+ nread = peer.read(src);
+ }
+ } while (nread > 0);
+
+ // write should complete now
+ int nsent = r3.get();
+ if (nsent <= 0)
+ throw new RuntimeException("Should have wrote at least one byte");
+ }
+
+ static interface Task<T> {
+ Future<T> run();
+ }
+
+ static <T> Future<T> initiateAndDie(final Task<T> task) {
+ final AtomicReference<Future<T>> result = new AtomicReference<Future<T>>();
+ Runnable r = new Runnable() {
+ public void run() {
+ result.set(task.run());
+ }
+ };
+ Thread t = new Thread(r);
+ t.start();
+ while (t.isAlive()) {
+ try {
+ t.join();
+ } catch (InterruptedException x) {
+ }
+ }
+ return result.get();
+ }
+}
--- a/jdk/test/java/nio/channels/AsynchronousSocketChannel/StressLoopback.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/test/java/nio/channels/AsynchronousSocketChannel/StressLoopback.java Sun Aug 23 12:53:45 2009 +0100
@@ -22,7 +22,7 @@
*/
/* @test
- * @bug 6834246
+ * @bug 6834246 6842687
* @summary Stress test connections through the loopback interface
*/
@@ -114,8 +114,6 @@
exc.printStackTrace();
closeUnchecked(channel);
}
- public void cancelled(Void att) {
- }
});
}
@@ -156,8 +154,6 @@
exc.printStackTrace();
closeUnchecked(channel);
}
- public void cancelled(Void att) {
- }
});
}
--- a/jdk/test/java/nio/channels/FileChannel/ReleaseOnCloseDeadlock.java Sat Aug 22 17:40:18 2009 +0100
+++ b/jdk/test/java/nio/channels/FileChannel/ReleaseOnCloseDeadlock.java Sun Aug 23 12:53:45 2009 +0100
@@ -22,7 +22,7 @@
*/
/* @test
- * @bug 6543863
+ * @bug 6543863 6842687
* @summary Try to cause a deadlock between (Asynchronous)FileChannel.close
* and FileLock.release
*/
@@ -56,7 +56,7 @@
AsynchronousFileChannel ch = AsynchronousFileChannel.open(file, READ, WRITE);
for (int i=0; i<LOCK_COUNT; i++) {
try {
- locks[i] = ch.lock(i, 1, true, null, null).get();
+ locks[i] = ch.lock(i, 1, true).get();
} catch (InterruptedException x) {
throw new RuntimeException(x);
} catch (ExecutionException x) {