6863110: Newly connected/accepted SctpChannel should fire OP_READ if registered with a Selector
Reviewed-by: jccollet
--- a/jdk/src/solaris/classes/sun/nio/ch/SctpChannelImpl.java Wed Jul 22 07:49:57 2009 -0700
+++ b/jdk/src/solaris/classes/sun/nio/ch/SctpChannelImpl.java Thu Jul 23 14:06:51 2009 +0100
@@ -127,8 +127,6 @@
/* -- End of fields protected by stateLock -- */
- private SctpResultContainer commUpResultContainer; /* null */
-
/**
* Constructor for normal connecting sockets
*/
@@ -761,12 +759,6 @@
if (!ensureReceiveOpen())
return null;
- if (commUpResultContainer != null) {
- resultContainer = commUpResultContainer;
- commUpResultContainer = null;
- continue;
- }
-
int n = 0;
try {
begin();
@@ -778,7 +770,7 @@
}
do {
- n = receive(fdVal, buffer, resultContainer);
+ n = receive(fdVal, buffer, resultContainer, fromConnect);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
} finally {
receiverCleanup();
@@ -809,9 +801,9 @@
if (fromConnect) {
/* If we reach here, then it was connect that invoked
- * receive an received the COMM_UP. Save it and allow
- * the user handler to process it upon next receive. */
- commUpResultContainer = resultContainer;
+ * receive and received the COMM_UP. We have already
+ * handled the COMM_UP with the internal notification
+ * handler. Simply return. */
return null;
}
} /* receiveLock */
@@ -827,20 +819,21 @@
private int receive(int fd,
ByteBuffer dst,
- SctpResultContainer resultContainer)
+ SctpResultContainer resultContainer,
+ boolean peek)
throws IOException {
int pos = dst.position();
int lim = dst.limit();
assert (pos <= lim);
int rem = (pos <= lim ? lim - pos : 0);
if (dst instanceof DirectBuffer && rem > 0)
- return receiveIntoNativeBuffer(fd, resultContainer, dst, rem, pos);
+ return receiveIntoNativeBuffer(fd, resultContainer, dst, rem, pos, peek);
/* Substitute a native buffer */
int newSize = Math.max(rem, 1);
ByteBuffer bb = Util.getTemporaryDirectBuffer(newSize);
try {
- int n = receiveIntoNativeBuffer(fd, resultContainer, bb, newSize, 0);
+ int n = receiveIntoNativeBuffer(fd, resultContainer, bb, newSize, 0, peek);
bb.flip();
if (n > 0 && rem > 0)
dst.put(bb);
@@ -854,10 +847,11 @@
SctpResultContainer resultContainer,
ByteBuffer bb,
int rem,
- int pos)
+ int pos,
+ boolean peek)
throws IOException
{
- int n = receive0(fd, resultContainer, ((DirectBuffer)bb).address() + pos, rem);
+ int n = receive0(fd, resultContainer, ((DirectBuffer)bb).address() + pos, rem, peek);
if (n > 0)
bb.position(pos + n);
@@ -1089,7 +1083,7 @@
private static native void initIDs();
static native int receive0(int fd, SctpResultContainer resultContainer,
- long address, int length) throws IOException;
+ long address, int length, boolean peek) throws IOException;
static native int send0(int fd, long address, int length,
SocketAddress target, int assocId, int streamNumber,
--- a/jdk/src/solaris/classes/sun/nio/ch/SctpMultiChannelImpl.java Wed Jul 22 07:49:57 2009 -0700
+++ b/jdk/src/solaris/classes/sun/nio/ch/SctpMultiChannelImpl.java Thu Jul 23 14:06:51 2009 +0100
@@ -31,6 +31,8 @@
import java.io.FileDescriptor;
import java.io.IOException;
import java.util.Collections;
+import java.util.Map.Entry;
+import java.util.Iterator;
import java.util.Set;
import java.util.HashSet;
import java.util.HashMap;
@@ -702,7 +704,7 @@
int assocId = association.associationID();
Set<SocketAddress> addresses = null;
- try {
+ try {
addresses = SctpNet.getRemoteAddresses(fdVal, assocId);
} catch (IOException unused) {
/* OK, determining connected addresses may not be possible
@@ -723,9 +725,11 @@
/* We cannot determine the connected addresses */
Set<java.util.Map.Entry<SocketAddress, Association>> addrAssocs =
addressMap.entrySet();
- for (java.util.Map.Entry<SocketAddress, Association> entry : addrAssocs) {
+ Iterator<Entry<SocketAddress, Association>> iterator = addrAssocs.iterator();
+ while (iterator.hasNext()) {
+ Entry<SocketAddress, Association> entry = iterator.next();
if (entry.getValue().equals(association)) {
- addressMap.remove(entry.getKey());
+ iterator.remove();
}
}
}
@@ -957,7 +961,7 @@
int length)
throws IOException{
return SctpChannelImpl.receive0(fd, resultContainer, address,
- length);
+ length, false /*peek */);
}
private static int send0(int fd,
--- a/jdk/src/solaris/native/sun/nio/ch/SctpChannelImpl.c Wed Jul 22 07:49:57 2009 -0700
+++ b/jdk/src/solaris/native/sun/nio/ch/SctpChannelImpl.c Thu Jul 23 14:06:51 2009 +0100
@@ -417,11 +417,11 @@
/*
* Class: sun_nio_ch_SctpChannelImpl
* Method: receive0
- * Signature: (ILsun/nio/ch/SctpResultContainer;JI)I
+ * Signature: (ILsun/nio/ch/SctpResultContainer;JIZ)I
*/
JNIEXPORT jint JNICALL Java_sun_nio_ch_SctpChannelImpl_receive0
(JNIEnv *env, jclass klass, jint fd, jobject resultContainerObj,
- jlong address, jint length) {
+ jlong address, jint length, jboolean peek) {
SOCKADDR sa;
int sa_len = sizeof(sa);
ssize_t rv = 0;
@@ -429,6 +429,7 @@
struct iovec iov[1];
struct msghdr msg[1];
char cbuf[CMSG_SPACE(sizeof (struct sctp_sndrcvinfo))];
+ int flags = peek == JNI_TRUE ? MSG_PEEK : 0;
/* Set up the msghdr structure for receiving */
memset(msg, 0, sizeof (*msg));
@@ -443,7 +444,7 @@
msg->msg_flags = 0;
do {
- if ((rv = recvmsg(fd, msg, 0)) < 0) {
+ if ((rv = recvmsg(fd, msg, flags)) < 0) {
if (errno == EWOULDBLOCK) {
return IOS_UNAVAILABLE;
} else if (errno == EINTR) {
@@ -473,7 +474,7 @@
memcpy(buf, addr, rv);
iov->iov_base = buf + rv;
iov->iov_len = NOTIFICATION_BUFFER_SIZE - rv;
- if ((rv = recvmsg(fd, msg, 0)) < 0) {
+ if ((rv = recvmsg(fd, msg, flags)) < 0) {
handleSocketError(env, errno);
return 0;
}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/com/sun/nio/sctp/SctpChannel/CommUp.java Thu Jul 23 14:06:51 2009 +0100
@@ -0,0 +1,364 @@
+/*
+ * Copyright 2009 Sun Microsystems, Inc. All Rights Reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ */
+
+/* @test
+ * @bug 6863110
+ * @summary Newly connected/accepted SctpChannel should fire OP_READ if registered with a Selector
+ * @author chegar
+ */
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.nio.ByteBuffer;
+import java.nio.channels.Selector;
+import java.nio.channels.SelectionKey;
+import com.sun.nio.sctp.AbstractNotificationHandler;
+import com.sun.nio.sctp.AssociationChangeNotification;
+import com.sun.nio.sctp.AssociationChangeNotification.AssocChangeEvent;
+import com.sun.nio.sctp.HandlerResult;
+import com.sun.nio.sctp.Notification;
+import com.sun.nio.sctp.SctpChannel;
+import com.sun.nio.sctp.SctpServerChannel;
+import com.sun.nio.sctp.ShutdownNotification;
+import static java.lang.System.out;
+import static java.lang.System.err;
+import static java.nio.channels.SelectionKey.OP_CONNECT;
+import static java.nio.channels.SelectionKey.OP_READ;
+
+public class CommUp {
+ static CountDownLatch acceptLatch = new CountDownLatch(1);
+ static final int TIMEOUT = 10000;
+
+ CommUpNotificationHandler clientHandler = new CommUpNotificationHandler();
+ CommUpNotificationHandler serverHandler = new CommUpNotificationHandler();
+ CommUpServer server;
+ Thread clientThread;
+
+ void test(String[] args) {
+ SocketAddress address = null;
+
+ if (!Util.isSCTPSupported()) {
+ out.println("SCTP protocol is not supported");
+ out.println("Test cannot be run");
+ return;
+ }
+
+ if (args.length == 2) {
+ /* requested to connecct to a specific address */
+ try {
+ int port = Integer.valueOf(args[1]);
+ address = new InetSocketAddress(args[0], port);
+ } catch (NumberFormatException nfe) {
+ err.println(nfe);
+ }
+ } else {
+ /* start server on local machine, default */
+ try {
+ server = new CommUpServer();
+ server.start();
+ address = server.address();
+ debug("Server started and listening on " + address);
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ return;
+ }
+ }
+
+ /* store the main thread so that the server can interrupt it, if necessary */
+ clientThread = Thread.currentThread();
+
+ doClient(address);
+ }
+
+ void doClient(SocketAddress peerAddress) {
+ SctpChannel sc = null;
+ try {
+ debug("connecting to " + peerAddress);
+ sc = SctpChannel.open();
+ sc.configureBlocking(false);
+ check(sc.isBlocking() == false, "Should be in non-blocking mode");
+ sc.connect(peerAddress);
+
+ Selector selector = Selector.open();
+ SelectionKey selectiontKey = sc.register(selector, OP_CONNECT);
+
+ /* Expect two interest Ops */
+ boolean opConnectReceived = false;
+ boolean opReadReceived = false;
+ for (int z=0; z<2; z++) {
+ debug("select " + z);
+ int keysAdded = selector.select(TIMEOUT);
+ debug("returned " + keysAdded + " keys");
+ if (keysAdded > 0) {
+ Set<SelectionKey> keys = selector.selectedKeys();
+ Iterator<SelectionKey> i = keys.iterator();
+ while(i.hasNext()) {
+ SelectionKey sk = i.next();
+ i.remove();
+ SctpChannel readyChannel =
+ (SctpChannel)sk.channel();
+
+ /* OP_CONNECT */
+ if (sk.isConnectable()) {
+ /* some trivial checks */
+ check(opConnectReceived == false,
+ "should only received one OP_CONNECT");
+ check(opReadReceived == false,
+ "should not receive OP_READ before OP_CONNECT");
+ check(readyChannel.equals(sc),
+ "channels should be equal");
+ check(!sk.isAcceptable(),
+ "key should not be acceptable");
+ check(!sk.isReadable(),
+ "key should not be readable");
+ check(!sk.isWritable(),
+ "key should not be writable");
+
+ /* now process the OP_CONNECT */
+ opConnectReceived = true;
+ check((sk.interestOps() & OP_CONNECT) == OP_CONNECT,
+ "selection key interest ops should contain OP_CONNECT");
+ sk.interestOps(OP_READ);
+ check((sk.interestOps() & OP_CONNECT) != OP_CONNECT,
+ "selection key interest ops should not contain OP_CONNECT");
+ check(sc.finishConnect(),
+ "finishConnect should return true");
+ } /* OP_READ */
+ else if (sk.isReadable()) {
+ /* some trivial checks */
+ check(opConnectReceived == true,
+ "should receive one OP_CONNECT before OP_READ");
+ check(opReadReceived == false,
+ "should not receive OP_READ before OP_CONNECT");
+ check(readyChannel.equals(sc),
+ "channels should be equal");
+ check(!sk.isAcceptable(),
+ "key should not be acceptable");
+ check(sk.isReadable(),
+ "key should be readable");
+ check(!sk.isWritable(),
+ "key should not be writable");
+ check(!sk.isConnectable(),
+ "key should not be connectable");
+
+ /* now process the OP_READ */
+ opReadReceived = true;
+ selectiontKey.cancel();
+
+ /* try with small buffer to see if native
+ * implementation can handle this */
+ ByteBuffer buffer = ByteBuffer.allocateDirect(1);
+ readyChannel.receive(buffer, null, clientHandler);
+ check(clientHandler.receivedCommUp(),
+ "Client should have received COMM_UP");
+
+ /* dont close (or put anything on) the channel until
+ * we check that the server's accepted channel also
+ * received COMM_UP */
+ serverHandler.waitForCommUp();
+ } else {
+ fail("Unexpected selection key");
+ }
+ }
+ } else {
+ fail("Client selector returned 0 ready keys");
+ /* stop the server */
+ server.thread().interrupt();
+ }
+ } //for
+
+ } catch (IOException ioe) {
+ unexpected(ioe);
+ } catch (InterruptedException ie) {
+ unexpected(ie);
+ }
+ }
+
+ class CommUpServer implements Runnable
+ {
+ final InetSocketAddress serverAddr;
+ private SctpServerChannel ssc;
+ private Thread serverThread;
+
+ public CommUpServer() throws IOException {
+ ssc = SctpServerChannel.open().bind(null);
+ java.util.Set<SocketAddress> addrs = ssc.getAllLocalAddresses();
+ if (addrs.isEmpty())
+ debug("addrs should not be empty");
+
+ serverAddr = (InetSocketAddress) addrs.iterator().next();
+ }
+
+ void start() {
+ serverThread = new Thread(this, "CommUpServer-" +
+ serverAddr.getPort());
+ serverThread.start();
+ }
+
+ InetSocketAddress address () {
+ return serverAddr;
+ }
+
+ Thread thread() {
+ return serverThread;
+ }
+
+ @Override
+ public void run() {
+ Selector selector = null;
+ SctpChannel sc = null;
+ SelectionKey readKey = null;
+ try {
+ sc = ssc.accept();
+ debug("accepted " + sc);
+
+ selector = Selector.open();
+ sc.configureBlocking(false);
+ check(sc.isBlocking() == false, "Should be in non-blocking mode");
+ readKey = sc.register(selector, SelectionKey.OP_READ);
+
+ debug("select");
+ int keysAdded = selector.select(TIMEOUT);
+ debug("returned " + keysAdded + " keys");
+ if (keysAdded > 0) {
+ Set<SelectionKey> keys = selector.selectedKeys();
+ Iterator<SelectionKey> i = keys.iterator();
+ while(i.hasNext()) {
+ SelectionKey sk = i.next();
+ i.remove();
+ SctpChannel readyChannel =
+ (SctpChannel)sk.channel();
+ check(readyChannel.equals(sc),
+ "channels should be equal");
+ check(!sk.isAcceptable(),
+ "key should not be acceptable");
+ check(sk.isReadable(),
+ "key should be readable");
+ check(!sk.isWritable(),
+ "key should not be writable");
+ check(!sk.isConnectable(),
+ "key should not be connectable");
+
+ /* block until we check if the client has received its COMM_UP*/
+ clientHandler.waitForCommUp();
+
+ ByteBuffer buffer = ByteBuffer.allocateDirect(1);
+ sc.receive(buffer, null, serverHandler);
+ check(serverHandler.receivedCommUp(),
+ "Accepted channel should have received COMM_UP");
+ }
+ } else {
+ fail("Server selector returned 0 ready keys");
+ /* stop the client */
+ clientThread.interrupt();
+ }
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ } catch (InterruptedException unused) {
+ } finally {
+ if (readKey != null) readKey.cancel();
+ try { if (selector != null) selector.close(); }
+ catch (IOException ioe) { unexpected(ioe); }
+ try { if (ssc != null) ssc.close(); }
+ catch (IOException ioe) { unexpected(ioe); }
+ try { if (sc != null) sc.close(); }
+ catch (IOException ioe) { unexpected(ioe); }
+ }
+ }
+ }
+
+ class CommUpNotificationHandler extends AbstractNotificationHandler<Object>
+ {
+ private boolean receivedCommUp; // false
+
+ public synchronized boolean receivedCommUp() {
+ return receivedCommUp;
+ }
+
+ public synchronized boolean waitForCommUp() throws InterruptedException {
+ while (receivedCommUp == false) {
+ wait();
+ }
+
+ return false;
+ }
+
+ @Override
+ public HandlerResult handleNotification(
+ Notification notification, Object attachment) {
+ fail("Unknown notification type");
+ return HandlerResult.CONTINUE;
+ }
+
+ @Override
+ public synchronized HandlerResult handleNotification(
+ AssociationChangeNotification notification, Object attachment) {
+ AssocChangeEvent event = notification.event();
+ debug("AssociationChangeNotification");
+ debug(" Association: " + notification.association());
+ debug(" Event: " + event);
+
+ if (event.equals(AssocChangeEvent.COMM_UP)) {
+ receivedCommUp = true;
+ notifyAll();
+ }
+
+ return HandlerResult.RETURN;
+ }
+
+ @Override
+ public HandlerResult handleNotification(
+ ShutdownNotification notification, Object attachment) {
+ debug("ShutdownNotification");
+ debug(" Association: " + notification.association());
+ return HandlerResult.RETURN;
+ }
+ }
+
+ //--------------------- Infrastructure ---------------------------
+ boolean debug = true;
+ volatile int passed = 0, failed = 0;
+ void pass() {passed++;}
+ void fail() {failed++; Thread.dumpStack();}
+ void fail(String msg) {err.println(msg); fail();}
+ void unexpected(Throwable t) {failed++; t.printStackTrace();}
+ void check(boolean cond) {if (cond) pass(); else fail();}
+ void check(boolean cond, String failMessage) {if (cond) pass(); else fail(failMessage);}
+ void debug(String message) {if(debug) { out.println(Thread.currentThread().getName() + ": " + message); } }
+ void sleep(long millis) { try { Thread.currentThread().sleep(millis); }
+ catch(InterruptedException ie) { unexpected(ie); }}
+ public static void main(String[] args) throws Throwable {
+ Class<?> k = new Object(){}.getClass().getEnclosingClass();
+ try {k.getMethod("instanceMain",String[].class)
+ .invoke( k.newInstance(), (Object) args);}
+ catch (Throwable e) {throw e.getCause();}}
+ public void instanceMain(String[] args) throws Throwable {
+ try {test(args);} catch (Throwable t) {unexpected(t);}
+ out.printf("%nPassed = %d, failed = %d%n%n", passed, failed);
+ if (failed > 0) throw new AssertionError("Some tests failed");}
+
+}
--- a/jdk/test/com/sun/nio/sctp/SctpMultiChannel/Branch.java Wed Jul 22 07:49:57 2009 -0700
+++ b/jdk/test/com/sun/nio/sctp/SctpMultiChannel/Branch.java Thu Jul 23 14:06:51 2009 +0100
@@ -115,7 +115,6 @@
/* Receive the COMM_UP */
buffer.clear();
BranchNotificationHandler handler = new BranchNotificationHandler();
- channel.configureBlocking(false);
info = channel.receive(buffer, null, handler);
check(handler.receivedCommUp(), "COMM_UP no received");
Set<Association> associations = channel.associations();
--- a/jdk/test/com/sun/nio/sctp/SctpMultiChannel/SocketOptionTests.java Wed Jul 22 07:49:57 2009 -0700
+++ b/jdk/test/com/sun/nio/sctp/SctpMultiChannel/SocketOptionTests.java Thu Jul 23 14:06:51 2009 +0100
@@ -181,7 +181,6 @@
/* Receive the COMM_UP */
buffer.clear();
SOTNotificationHandler handler = new SOTNotificationHandler();
- smc.configureBlocking(false);
info = smc.receive(buffer, null, handler);
check(handler.receivedCommUp(), "COMM_UP no received");
Set<Association> associations = smc.associations();
@@ -220,6 +219,7 @@
}
check(found, "SCTP_PRIMARY_ADDR returned bogus address!");
+ System.out.println("Try SCTP_PRIMARY_ADDR set to: " + addrToSet);
smc.setOption(SCTP_PRIMARY_ADDR, addrToSet, assoc);
System.out.println("SCTP_PRIMARY_ADDR set to: " + addrToSet);
primaryAddr = smc.getOption(SCTP_PRIMARY_ADDR, assoc);