# HG changeset patch # User alanb # Date 1241461514 -3600 # Node ID 27b7f2d5e949a0ed3ff93b0113887474684c0abf # Parent a92617170304a354d295d84647dd80eeac3a9537 6834246: (ch) AsynchronousSocketChannel#write completes with wrong number of bytes written under load (win) Reviewed-by: sherman diff -r a92617170304 -r 27b7f2d5e949 jdk/src/windows/classes/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.java --- a/jdk/src/windows/classes/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.java Fri May 01 12:06:14 2009 -0700 +++ b/jdk/src/windows/classes/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.java Mon May 04 19:25:14 2009 +0100 @@ -475,49 +475,40 @@ // get an OVERLAPPED structure (from the cache or allocate) overlapped = ioCache.add(result); - // synchronize on result to allow this thread handle the case - // where the read completes immediately. - synchronized (result) { - int n = read0(handle, numBufs, readBufferArray, overlapped); - if (n == IOStatus.UNAVAILABLE) { - // I/O is pending - pending = true; - return; + // initiate read + int n = read0(handle, numBufs, readBufferArray, overlapped); + if (n == IOStatus.UNAVAILABLE) { + // I/O is pending + pending = true; + return; + } + if (n == IOStatus.EOF) { + // input shutdown + enableReading(); + if (scatteringRead) { + result.setResult((V)Long.valueOf(-1L)); + } else { + result.setResult((V)Integer.valueOf(-1)); } - // read completed immediately: - // 1. update buffer position - // 2. reset read flag - // 3. release waiters - if (n == 0) { - n = -1; - } else { - updateBuffers(n); - } - enableReading(); - - if (scatteringRead) { - result.setResult((V)Long.valueOf(n)); - } else { - result.setResult((V)Integer.valueOf(n)); - } + } else { + throw new InternalError("Read completed immediately"); } } catch (Throwable x) { - // failed to initiate read: - // 1. reset read flag - // 2. free resources - // 3. release waiters + // failed to initiate read + // reset read flag before releasing waiters enableReading(); - if (overlapped != 0L) - ioCache.remove(overlapped); if (x instanceof ClosedChannelException) x = new AsynchronousCloseException(); if (!(x instanceof IOException)) x = new IOException(x); result.setFailure(x); } finally { - if (prepared && !pending) { - // return direct buffer(s) to cache if substituted - releaseBuffers(); + // release resources if I/O not pending + if (!pending) { + if (overlapped != 0L) + ioCache.remove(overlapped); + if (prepared) + releaseBuffers(); } end(); } @@ -721,7 +712,6 @@ @Override @SuppressWarnings("unchecked") public void run() { - int n = -1; long overlapped = 0L; boolean prepared = false; boolean pending = false; @@ -736,56 +726,34 @@ // get an OVERLAPPED structure (from the cache or allocate) overlapped = ioCache.add(result); - - // synchronize on result to allow this thread handle the case - // where the read completes immediately. - synchronized (result) { - n = write0(handle, numBufs, writeBufferArray, overlapped); - if (n == IOStatus.UNAVAILABLE) { - // I/O is pending - pending = true; - return; - } - - enableWriting(); - - if (n == IOStatus.EOF) { - // special case for shutdown output - shutdown = true; - throw new ClosedChannelException(); - } - - // write completed immediately: - // 1. enable writing - // 2. update buffer position - // 3. release waiters - updateBuffers(n); - - // result is a Long or Integer - if (gatheringWrite) { - result.setResult((V)Long.valueOf(n)); - } else { - result.setResult((V)Integer.valueOf(n)); - } + int n = write0(handle, numBufs, writeBufferArray, overlapped); + if (n == IOStatus.UNAVAILABLE) { + // I/O is pending + pending = true; + return; } + if (n == IOStatus.EOF) { + // special case for shutdown output + shutdown = true; + throw new ClosedChannelException(); + } + // write completed immediately + throw new InternalError("Write completed immediately"); } catch (Throwable x) { + // write failed. Enable writing before releasing waiters. enableWriting(); - - // failed to initiate read: if (!shutdown && (x instanceof ClosedChannelException)) x = new AsynchronousCloseException(); if (!(x instanceof IOException)) x = new IOException(x); result.setFailure(x); - - // release resources - if (overlapped != 0L) - ioCache.remove(overlapped); - } finally { - if (prepared && !pending) { - // return direct buffer(s) to cache if substituted - releaseBuffers(); + // release resources if I/O not pending + if (!pending) { + if (overlapped != 0L) + ioCache.remove(overlapped); + if (prepared) + releaseBuffers(); } end(); } diff -r a92617170304 -r 27b7f2d5e949 jdk/src/windows/native/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.c --- a/jdk/src/windows/native/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.c Fri May 01 12:06:14 2009 -0700 +++ b/jdk/src/windows/native/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.c Mon May 04 19:25:14 2009 +0100 @@ -157,14 +157,13 @@ WSABUF* lpWsaBuf = (WSABUF*) jlong_to_ptr(address); OVERLAPPED* lpOverlapped = (OVERLAPPED*) jlong_to_ptr(ov); BOOL res; - DWORD nread = 0; DWORD flags = 0; ZeroMemory((PVOID)lpOverlapped, sizeof(OVERLAPPED)); res = WSARecv(s, lpWsaBuf, (DWORD)count, - &nread, + NULL, &flags, lpOverlapped, NULL); @@ -175,17 +174,12 @@ return IOS_UNAVAILABLE; } if (error == WSAESHUTDOWN) { - return 0; // input shutdown + return IOS_EOF; // input shutdown } JNU_ThrowIOExceptionWithLastError(env, "WSARecv failed"); return IOS_THROWN; } - if (nread == 0) { - // Handle graceful close or bytes not yet available cases - // via completion port notification. - return IOS_UNAVAILABLE; - } - return (jint)nread; + return IOS_UNAVAILABLE; } JNIEXPORT jint JNICALL @@ -196,13 +190,12 @@ WSABUF* lpWsaBuf = (WSABUF*) jlong_to_ptr(address); OVERLAPPED* lpOverlapped = (OVERLAPPED*) jlong_to_ptr(ov); BOOL res; - DWORD nwritten; ZeroMemory((PVOID)lpOverlapped, sizeof(OVERLAPPED)); res = WSASend(s, lpWsaBuf, (DWORD)count, - &nwritten, + NULL, 0, lpOverlapped, NULL); @@ -218,5 +211,5 @@ JNU_ThrowIOExceptionWithLastError(env, "WSASend failed"); return IOS_THROWN; } - return (jint)nwritten; + return IOS_UNAVAILABLE; } diff -r a92617170304 -r 27b7f2d5e949 jdk/test/java/nio/channels/AsynchronousSocketChannel/StressLoopback.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/jdk/test/java/nio/channels/AsynchronousSocketChannel/StressLoopback.java Mon May 04 19:25:14 2009 +0100 @@ -0,0 +1,183 @@ +/* + * Copyright 2008-2009 Sun Microsystems, Inc. All Rights Reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + */ + +/* @test + * @bug 6834246 + * @summary Stress test connections through the loopback interface + */ + +import java.nio.ByteBuffer; +import java.net.*; +import java.nio.channels.*; +import java.util.Random; +import java.io.IOException; + +public class StressLoopback { + static final Random rand = new Random(); + + public static void main(String[] args) throws Exception { + // setup listener + AsynchronousServerSocketChannel listener = + AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(0)); + int port =((InetSocketAddress)(listener.getLocalAddress())).getPort(); + InetAddress lh = InetAddress.getLocalHost(); + SocketAddress remote = new InetSocketAddress(lh, port); + + // create sources and sinks + int count = 2 + rand.nextInt(9); + Source[] source = new Source[count]; + Sink[] sink = new Sink[count]; + for (int i=0; i %d (%s)\n", + nwrote, nread, (failed) ? "FAIL" : "PASS"); + total += nwrote; + } + if (failed) + throw new RuntimeException("Test failed - see log for details"); + System.out.format("Total sent %d MB\n", total / (1024L * 1024L)); + } + + /** + * Writes bytes to a channel until "done". When done the channel is closed. + */ + static class Source { + private final AsynchronousByteChannel channel; + private final ByteBuffer sentBuffer; + private volatile long bytesSent; + private volatile boolean finished; + + Source(AsynchronousByteChannel channel) { + this.channel = channel; + int size = 1024 + rand.nextInt(10000); + this.sentBuffer = (rand.nextBoolean()) ? + ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size); + } + + void start() { + sentBuffer.position(0); + sentBuffer.limit(sentBuffer.capacity()); + channel.write(sentBuffer, null, new CompletionHandler () { + public void completed(Integer nwrote, Void att) { + bytesSent += nwrote; + if (finished) { + closeUnchecked(channel); + } else { + sentBuffer.position(0); + sentBuffer.limit(sentBuffer.capacity()); + channel.write(sentBuffer, null, this); + } + } + public void failed(Throwable exc, Void att) { + exc.printStackTrace(); + closeUnchecked(channel); + } + public void cancelled(Void att) { + } + }); + } + + long finish() { + finished = true; + waitUntilClosed(channel); + return bytesSent; + } + } + + /** + * Read bytes from a channel until EOF is received. + */ + static class Sink { + private final AsynchronousByteChannel channel; + private final ByteBuffer readBuffer; + private volatile long bytesRead; + + Sink(AsynchronousByteChannel channel) { + this.channel = channel; + int size = 1024 + rand.nextInt(10000); + this.readBuffer = (rand.nextBoolean()) ? + ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size); + } + + void start() { + channel.read(readBuffer, null, new CompletionHandler () { + public void completed(Integer nread, Void att) { + if (nread < 0) { + closeUnchecked(channel); + } else { + bytesRead += nread; + readBuffer.clear(); + channel.read(readBuffer, null, this); + } + } + public void failed(Throwable exc, Void att) { + exc.printStackTrace(); + closeUnchecked(channel); + } + public void cancelled(Void att) { + } + }); + } + + long finish() { + waitUntilClosed(channel); + return bytesRead; + } + } + + static void waitUntilClosed(Channel c) { + while (c.isOpen()) { + try { + Thread.sleep(100); + } catch (InterruptedException ignore) { } + } + } + + static void closeUnchecked(Channel c) { + try { + c.close(); + } catch (IOException ignore) { } + } +}