6863110: Newly connected/accepted SctpChannel should fire OP_READ if registered with a Selector
authorchegar
Thu, 23 Jul 2009 14:06:51 +0100
changeset 3320 a7c037dd2e14
parent 3319 53a6d815c92f
child 3322 82096b9a8dd3
child 3416 0dd4f82becda
6863110: Newly connected/accepted SctpChannel should fire OP_READ if registered with a Selector Reviewed-by: jccollet
jdk/src/solaris/classes/sun/nio/ch/SctpChannelImpl.java
jdk/src/solaris/classes/sun/nio/ch/SctpMultiChannelImpl.java
jdk/src/solaris/native/sun/nio/ch/SctpChannelImpl.c
jdk/test/com/sun/nio/sctp/SctpChannel/CommUp.java
jdk/test/com/sun/nio/sctp/SctpMultiChannel/Branch.java
jdk/test/com/sun/nio/sctp/SctpMultiChannel/SocketOptionTests.java
--- a/jdk/src/solaris/classes/sun/nio/ch/SctpChannelImpl.java	Wed Jul 22 07:49:57 2009 -0700
+++ b/jdk/src/solaris/classes/sun/nio/ch/SctpChannelImpl.java	Thu Jul 23 14:06:51 2009 +0100
@@ -127,8 +127,6 @@
 
     /* -- End of fields protected by stateLock -- */
 
-    private SctpResultContainer commUpResultContainer;  /* null */
-
     /**
      * Constructor for normal connecting sockets
      */
@@ -761,12 +759,6 @@
                     if (!ensureReceiveOpen())
                         return null;
 
-                    if (commUpResultContainer != null) {
-                        resultContainer = commUpResultContainer;
-                        commUpResultContainer = null;
-                        continue;
-                    }
-
                     int n = 0;
                     try {
                         begin();
@@ -778,7 +770,7 @@
                         }
 
                         do {
-                            n = receive(fdVal, buffer, resultContainer);
+                            n = receive(fdVal, buffer, resultContainer, fromConnect);
                         } while ((n == IOStatus.INTERRUPTED) && isOpen());
                     } finally {
                         receiverCleanup();
@@ -809,9 +801,9 @@
 
                     if (fromConnect)  {
                         /* If we reach here, then it was connect that invoked
-                         * receive an received the COMM_UP. Save it and allow
-                         * the user handler to process it upon next receive. */
-                        commUpResultContainer = resultContainer;
+                         * receive and received the COMM_UP. We have already
+                         * handled the COMM_UP with the internal notification
+                         * handler. Simply return. */
                         return null;
                     }
                 }  /* receiveLock */
@@ -827,20 +819,21 @@
 
     private int receive(int fd,
                         ByteBuffer dst,
-                        SctpResultContainer resultContainer)
+                        SctpResultContainer resultContainer,
+                        boolean peek)
             throws IOException {
         int pos = dst.position();
         int lim = dst.limit();
         assert (pos <= lim);
         int rem = (pos <= lim ? lim - pos : 0);
         if (dst instanceof DirectBuffer && rem > 0)
-            return receiveIntoNativeBuffer(fd, resultContainer, dst, rem, pos);
+            return receiveIntoNativeBuffer(fd, resultContainer, dst, rem, pos, peek);
 
         /* Substitute a native buffer */
         int newSize = Math.max(rem, 1);
         ByteBuffer bb = Util.getTemporaryDirectBuffer(newSize);
         try {
-            int n = receiveIntoNativeBuffer(fd, resultContainer, bb, newSize, 0);
+            int n = receiveIntoNativeBuffer(fd, resultContainer, bb, newSize, 0, peek);
             bb.flip();
             if (n > 0 && rem > 0)
                 dst.put(bb);
@@ -854,10 +847,11 @@
                                         SctpResultContainer resultContainer,
                                         ByteBuffer bb,
                                         int rem,
-                                        int pos)
+                                        int pos,
+                                        boolean peek)
         throws IOException
     {
-        int n = receive0(fd, resultContainer, ((DirectBuffer)bb).address() + pos, rem);
+        int n = receive0(fd, resultContainer, ((DirectBuffer)bb).address() + pos, rem, peek);
 
         if (n > 0)
             bb.position(pos + n);
@@ -1089,7 +1083,7 @@
     private static native void initIDs();
 
     static native int receive0(int fd, SctpResultContainer resultContainer,
-            long address, int length) throws IOException;
+            long address, int length, boolean peek) throws IOException;
 
     static native int send0(int fd, long address, int length,
             SocketAddress target, int assocId, int streamNumber,
--- a/jdk/src/solaris/classes/sun/nio/ch/SctpMultiChannelImpl.java	Wed Jul 22 07:49:57 2009 -0700
+++ b/jdk/src/solaris/classes/sun/nio/ch/SctpMultiChannelImpl.java	Thu Jul 23 14:06:51 2009 +0100
@@ -31,6 +31,8 @@
 import java.io.FileDescriptor;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.Map.Entry;
+import java.util.Iterator;
 import java.util.Set;
 import java.util.HashSet;
 import java.util.HashMap;
@@ -702,7 +704,7 @@
             int assocId = association.associationID();
             Set<SocketAddress> addresses = null;
 
-            try {
+             try {
                 addresses = SctpNet.getRemoteAddresses(fdVal, assocId);
             } catch (IOException unused) {
                 /* OK, determining connected addresses may not be possible
@@ -723,9 +725,11 @@
                 /* We cannot determine the connected addresses */
                 Set<java.util.Map.Entry<SocketAddress, Association>> addrAssocs =
                         addressMap.entrySet();
-                for (java.util.Map.Entry<SocketAddress, Association> entry : addrAssocs) {
+                Iterator<Entry<SocketAddress, Association>> iterator = addrAssocs.iterator();
+                while (iterator.hasNext()) {
+                    Entry<SocketAddress, Association> entry = iterator.next();
                     if (entry.getValue().equals(association)) {
-                        addressMap.remove(entry.getKey());
+                        iterator.remove();
                     }
                 }
             }
@@ -957,7 +961,7 @@
                                 int length)
             throws IOException{
         return SctpChannelImpl.receive0(fd, resultContainer, address,
-                length);
+                length, false /*peek */);
     }
 
     private static int send0(int fd,
--- a/jdk/src/solaris/native/sun/nio/ch/SctpChannelImpl.c	Wed Jul 22 07:49:57 2009 -0700
+++ b/jdk/src/solaris/native/sun/nio/ch/SctpChannelImpl.c	Thu Jul 23 14:06:51 2009 +0100
@@ -417,11 +417,11 @@
 /*
  * Class:     sun_nio_ch_SctpChannelImpl
  * Method:    receive0
- * Signature: (ILsun/nio/ch/SctpResultContainer;JI)I
+ * Signature: (ILsun/nio/ch/SctpResultContainer;JIZ)I
  */
 JNIEXPORT jint JNICALL Java_sun_nio_ch_SctpChannelImpl_receive0
   (JNIEnv *env, jclass klass, jint fd, jobject resultContainerObj,
-   jlong address, jint length) {
+   jlong address, jint length, jboolean peek) {
     SOCKADDR sa;
     int sa_len = sizeof(sa);
     ssize_t rv = 0;
@@ -429,6 +429,7 @@
     struct iovec iov[1];
     struct msghdr msg[1];
     char cbuf[CMSG_SPACE(sizeof (struct sctp_sndrcvinfo))];
+    int flags = peek == JNI_TRUE ? MSG_PEEK : 0;
 
     /* Set up the msghdr structure for receiving */
     memset(msg, 0, sizeof (*msg));
@@ -443,7 +444,7 @@
     msg->msg_flags = 0;
 
     do {
-        if ((rv = recvmsg(fd, msg, 0)) < 0) {
+        if ((rv = recvmsg(fd, msg, flags)) < 0) {
             if (errno == EWOULDBLOCK) {
                 return IOS_UNAVAILABLE;
             } else if (errno == EINTR) {
@@ -473,7 +474,7 @@
                 memcpy(buf, addr, rv);
                 iov->iov_base = buf + rv;
                 iov->iov_len = NOTIFICATION_BUFFER_SIZE - rv;
-                if ((rv = recvmsg(fd, msg, 0)) < 0) {
+                if ((rv = recvmsg(fd, msg, flags)) < 0) {
                     handleSocketError(env, errno);
                     return 0;
                 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/com/sun/nio/sctp/SctpChannel/CommUp.java	Thu Jul 23 14:06:51 2009 +0100
@@ -0,0 +1,364 @@
+/*
+ * Copyright 2009 Sun Microsystems, Inc.  All Rights Reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ */
+
+/* @test
+ * @bug 6863110
+ * @summary Newly connected/accepted SctpChannel should fire OP_READ if registered with a Selector
+ * @author chegar
+ */
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.nio.ByteBuffer;
+import java.nio.channels.Selector;
+import java.nio.channels.SelectionKey;
+import com.sun.nio.sctp.AbstractNotificationHandler;
+import com.sun.nio.sctp.AssociationChangeNotification;
+import com.sun.nio.sctp.AssociationChangeNotification.AssocChangeEvent;
+import com.sun.nio.sctp.HandlerResult;
+import com.sun.nio.sctp.Notification;
+import com.sun.nio.sctp.SctpChannel;
+import com.sun.nio.sctp.SctpServerChannel;
+import com.sun.nio.sctp.ShutdownNotification;
+import static java.lang.System.out;
+import static java.lang.System.err;
+import static java.nio.channels.SelectionKey.OP_CONNECT;
+import static java.nio.channels.SelectionKey.OP_READ;
+
+public class CommUp {
+    static CountDownLatch acceptLatch = new CountDownLatch(1);
+    static final int TIMEOUT = 10000;
+
+    CommUpNotificationHandler clientHandler = new CommUpNotificationHandler();
+    CommUpNotificationHandler serverHandler = new CommUpNotificationHandler();
+    CommUpServer server;
+    Thread clientThread;
+
+    void test(String[] args) {
+        SocketAddress address = null;
+
+        if (!Util.isSCTPSupported()) {
+            out.println("SCTP protocol is not supported");
+            out.println("Test cannot be run");
+            return;
+        }
+
+        if (args.length == 2) {
+            /* requested to connecct to a specific address */
+            try {
+                int port = Integer.valueOf(args[1]);
+                address = new InetSocketAddress(args[0], port);
+            } catch (NumberFormatException nfe) {
+                err.println(nfe);
+            }
+        } else {
+            /* start server on local machine, default */
+            try {
+                server = new CommUpServer();
+                server.start();
+                address = server.address();
+                debug("Server started and listening on " + address);
+            } catch (IOException ioe) {
+                ioe.printStackTrace();
+                return;
+            }
+        }
+
+        /* store the main thread so that the server can interrupt it, if necessary */
+        clientThread = Thread.currentThread();
+
+        doClient(address);
+    }
+
+    void doClient(SocketAddress peerAddress) {
+        SctpChannel sc = null;
+        try {
+            debug("connecting to " + peerAddress);
+            sc = SctpChannel.open();
+            sc.configureBlocking(false);
+            check(sc.isBlocking() == false, "Should be in non-blocking mode");
+            sc.connect(peerAddress);
+
+            Selector selector = Selector.open();
+            SelectionKey selectiontKey = sc.register(selector, OP_CONNECT);
+
+            /* Expect two interest Ops */
+            boolean opConnectReceived = false;
+            boolean opReadReceived = false;
+            for (int z=0; z<2; z++) {
+                debug("select " + z);
+                int keysAdded = selector.select(TIMEOUT);
+                debug("returned " + keysAdded + " keys");
+                if (keysAdded > 0) {
+                    Set<SelectionKey> keys = selector.selectedKeys();
+                    Iterator<SelectionKey> i = keys.iterator();
+                    while(i.hasNext()) {
+                        SelectionKey sk = i.next();
+                        i.remove();
+                        SctpChannel readyChannel =
+                            (SctpChannel)sk.channel();
+
+                        /* OP_CONNECT */
+                        if (sk.isConnectable()) {
+                            /* some trivial checks */
+                            check(opConnectReceived == false,
+                                  "should only received one OP_CONNECT");
+                            check(opReadReceived == false,
+                                  "should not receive OP_READ before OP_CONNECT");
+                            check(readyChannel.equals(sc),
+                                  "channels should be equal");
+                            check(!sk.isAcceptable(),
+                                  "key should not be acceptable");
+                            check(!sk.isReadable(),
+                                  "key should not be readable");
+                            check(!sk.isWritable(),
+                                  "key should not be writable");
+
+                            /* now process the OP_CONNECT */
+                            opConnectReceived = true;
+                            check((sk.interestOps() & OP_CONNECT) == OP_CONNECT,
+                                  "selection key interest ops should contain OP_CONNECT");
+                            sk.interestOps(OP_READ);
+                            check((sk.interestOps() & OP_CONNECT) != OP_CONNECT,
+                                  "selection key interest ops should not contain OP_CONNECT");
+                            check(sc.finishConnect(),
+                                  "finishConnect should return true");
+                        } /* OP_READ */
+                          else if (sk.isReadable()) {
+                            /* some trivial checks */
+                            check(opConnectReceived == true,
+                                  "should receive one OP_CONNECT before OP_READ");
+                            check(opReadReceived == false,
+                                  "should not receive OP_READ before OP_CONNECT");
+                            check(readyChannel.equals(sc),
+                                  "channels should be equal");
+                            check(!sk.isAcceptable(),
+                                  "key should not be acceptable");
+                            check(sk.isReadable(),
+                                  "key should be readable");
+                            check(!sk.isWritable(),
+                                  "key should not be writable");
+                            check(!sk.isConnectable(),
+                                  "key should not be connectable");
+
+                            /* now process the OP_READ */
+                            opReadReceived = true;
+                            selectiontKey.cancel();
+
+                            /* try with small buffer to see if native
+                             * implementation can handle this */
+                            ByteBuffer buffer = ByteBuffer.allocateDirect(1);
+                            readyChannel.receive(buffer, null, clientHandler);
+                            check(clientHandler.receivedCommUp(),
+                                    "Client should have received COMM_UP");
+
+                            /* dont close (or put anything on) the channel until
+                             * we check that the server's accepted channel also
+                             * received COMM_UP */
+                            serverHandler.waitForCommUp();
+                        } else {
+                            fail("Unexpected selection key");
+                        }
+                    }
+                } else {
+                    fail("Client selector returned 0 ready keys");
+                    /* stop the server */
+                    server.thread().interrupt();
+                }
+            } //for
+
+        } catch (IOException ioe) {
+            unexpected(ioe);
+        } catch (InterruptedException ie) {
+            unexpected(ie);
+        }
+    }
+
+    class CommUpServer implements Runnable
+    {
+        final InetSocketAddress serverAddr;
+        private SctpServerChannel ssc;
+        private Thread serverThread;
+
+        public CommUpServer() throws IOException {
+            ssc = SctpServerChannel.open().bind(null);
+            java.util.Set<SocketAddress> addrs = ssc.getAllLocalAddresses();
+            if (addrs.isEmpty())
+                debug("addrs should not be empty");
+
+            serverAddr = (InetSocketAddress) addrs.iterator().next();
+        }
+
+        void start() {
+            serverThread = new Thread(this, "CommUpServer-"  +
+                                              serverAddr.getPort());
+            serverThread.start();
+        }
+
+        InetSocketAddress address () {
+            return serverAddr;
+        }
+
+        Thread thread() {
+            return serverThread;
+        }
+
+        @Override
+        public void run() {
+            Selector selector = null;
+            SctpChannel sc = null;
+            SelectionKey readKey = null;
+            try {
+                sc = ssc.accept();
+                debug("accepted " + sc);
+
+                selector = Selector.open();
+                sc.configureBlocking(false);
+                check(sc.isBlocking() == false, "Should be in non-blocking mode");
+                readKey = sc.register(selector, SelectionKey.OP_READ);
+
+                debug("select");
+                int keysAdded = selector.select(TIMEOUT);
+                debug("returned " + keysAdded + " keys");
+                if (keysAdded > 0) {
+                    Set<SelectionKey> keys = selector.selectedKeys();
+                    Iterator<SelectionKey> i = keys.iterator();
+                    while(i.hasNext()) {
+                        SelectionKey sk = i.next();
+                        i.remove();
+                        SctpChannel readyChannel =
+                            (SctpChannel)sk.channel();
+                        check(readyChannel.equals(sc),
+                                "channels should be equal");
+                        check(!sk.isAcceptable(),
+                                "key should not be acceptable");
+                        check(sk.isReadable(),
+                                "key should be readable");
+                        check(!sk.isWritable(),
+                                "key should not be writable");
+                        check(!sk.isConnectable(),
+                                "key should not be connectable");
+
+                        /* block until we check if the client has received its COMM_UP*/
+                        clientHandler.waitForCommUp();
+
+                        ByteBuffer buffer = ByteBuffer.allocateDirect(1);
+                        sc.receive(buffer, null, serverHandler);
+                        check(serverHandler.receivedCommUp(),
+                                "Accepted channel should have received COMM_UP");
+                    }
+                } else {
+                   fail("Server selector returned 0 ready keys");
+                   /* stop the client */
+                   clientThread.interrupt();
+            }
+            } catch (IOException ioe) {
+                ioe.printStackTrace();
+            } catch (InterruptedException unused) {
+            } finally {
+                if (readKey != null) readKey.cancel();
+                try { if (selector != null) selector.close(); }
+                catch (IOException  ioe) { unexpected(ioe); }
+                try { if (ssc != null) ssc.close(); }
+                catch (IOException  ioe) { unexpected(ioe); }
+                try { if (sc != null) sc.close(); }
+                catch (IOException  ioe) { unexpected(ioe); }
+            }
+        }
+    }
+
+    class CommUpNotificationHandler extends AbstractNotificationHandler<Object>
+    {
+        private boolean receivedCommUp;  // false
+
+        public synchronized boolean receivedCommUp() {
+            return receivedCommUp;
+        }
+
+        public synchronized boolean waitForCommUp() throws InterruptedException {
+            while (receivedCommUp == false) {
+                wait();
+            }
+
+            return false;
+        }
+
+        @Override
+        public HandlerResult handleNotification(
+                Notification notification, Object attachment) {
+            fail("Unknown notification type");
+            return HandlerResult.CONTINUE;
+        }
+
+        @Override
+        public synchronized HandlerResult handleNotification(
+                AssociationChangeNotification notification, Object attachment) {
+            AssocChangeEvent event = notification.event();
+            debug("AssociationChangeNotification");
+            debug("  Association: " + notification.association());
+            debug("  Event: " + event);
+
+            if (event.equals(AssocChangeEvent.COMM_UP)) {
+                receivedCommUp = true;
+                notifyAll();
+            }
+
+            return HandlerResult.RETURN;
+        }
+
+        @Override
+        public HandlerResult handleNotification(
+                ShutdownNotification notification, Object attachment) {
+            debug("ShutdownNotification");
+            debug("  Association: " + notification.association());
+            return HandlerResult.RETURN;
+        }
+    }
+
+        //--------------------- Infrastructure ---------------------------
+    boolean debug = true;
+    volatile int passed = 0, failed = 0;
+    void pass() {passed++;}
+    void fail() {failed++; Thread.dumpStack();}
+    void fail(String msg) {err.println(msg); fail();}
+    void unexpected(Throwable t) {failed++; t.printStackTrace();}
+    void check(boolean cond) {if (cond) pass(); else fail();}
+    void check(boolean cond, String failMessage) {if (cond) pass(); else fail(failMessage);}
+    void debug(String message) {if(debug) { out.println(Thread.currentThread().getName() + ": " + message); }  }
+    void sleep(long millis) { try { Thread.currentThread().sleep(millis); }
+                          catch(InterruptedException ie) { unexpected(ie); }}
+    public static void main(String[] args) throws Throwable {
+        Class<?> k = new Object(){}.getClass().getEnclosingClass();
+        try {k.getMethod("instanceMain",String[].class)
+                .invoke( k.newInstance(), (Object) args);}
+        catch (Throwable e) {throw e.getCause();}}
+    public void instanceMain(String[] args) throws Throwable {
+        try {test(args);} catch (Throwable t) {unexpected(t);}
+        out.printf("%nPassed = %d, failed = %d%n%n", passed, failed);
+        if (failed > 0) throw new AssertionError("Some tests failed");}
+
+}
--- a/jdk/test/com/sun/nio/sctp/SctpMultiChannel/Branch.java	Wed Jul 22 07:49:57 2009 -0700
+++ b/jdk/test/com/sun/nio/sctp/SctpMultiChannel/Branch.java	Thu Jul 23 14:06:51 2009 +0100
@@ -115,7 +115,6 @@
             /* Receive the COMM_UP */
             buffer.clear();
             BranchNotificationHandler handler = new BranchNotificationHandler();
-            channel.configureBlocking(false);
             info = channel.receive(buffer, null, handler);
             check(handler.receivedCommUp(), "COMM_UP no received");
             Set<Association> associations = channel.associations();
--- a/jdk/test/com/sun/nio/sctp/SctpMultiChannel/SocketOptionTests.java	Wed Jul 22 07:49:57 2009 -0700
+++ b/jdk/test/com/sun/nio/sctp/SctpMultiChannel/SocketOptionTests.java	Thu Jul 23 14:06:51 2009 +0100
@@ -181,7 +181,6 @@
         /* Receive the COMM_UP */
         buffer.clear();
         SOTNotificationHandler handler = new SOTNotificationHandler();
-        smc.configureBlocking(false);
         info = smc.receive(buffer, null, handler);
         check(handler.receivedCommUp(), "COMM_UP no received");
         Set<Association> associations = smc.associations();
@@ -220,6 +219,7 @@
             }
             check(found, "SCTP_PRIMARY_ADDR returned bogus address!");
 
+            System.out.println("Try SCTP_PRIMARY_ADDR set to: " + addrToSet);
             smc.setOption(SCTP_PRIMARY_ADDR, addrToSet, assoc);
             System.out.println("SCTP_PRIMARY_ADDR set to: " + addrToSet);
             primaryAddr = smc.getOption(SCTP_PRIMARY_ADDR, assoc);