test/jdk/jdk/net/RdmaSockets/rsocket/Selector/SelectorTest.java
branchrsocket-branch
changeset 57115 512e7cc6ccce
child 57156 81e4a12fd1a4
equal deleted inserted replaced
53485:b743968ad646 57115:512e7cc6ccce
       
     1 /*
       
     2  * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.
       
     8  *
       
     9  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    12  * version 2 for more details (a copy is included in the LICENSE file that
       
    13  * accompanied this code).
       
    14  *
       
    15  * You should have received a copy of the GNU General Public License version
       
    16  * 2 along with this work; if not, write to the Free Software Foundation,
       
    17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    18  *
       
    19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    20  * or visit www.oracle.com if you need additional information or have any
       
    21  * questions.
       
    22  */
       
    23 
       
    24 /* @test
       
    25  * @bug 8195160
       
    26  * @summary Test RdmaSelector with RdmaServerSocketChannels
       
    27  * @requires (os.family == "linux")
       
    28  * @library .. /test/lib /test/jdk/java/nio/channels
       
    29  * @build RsocketTest
       
    30  * @run main/othervm BasicAccept
       
    31  */
       
    32 
       
    33 import java.io.IOException;
       
    34 import java.net.InetAddress;
       
    35 import java.net.InetSocketAddress;
       
    36 import java.net.StandardProtocolFamily;
       
    37 import java.nio.ByteBuffer;
       
    38 import java.nio.channels.Selector;
       
    39 import java.nio.channels.SelectionKey;
       
    40 import java.nio.channels.ServerSocketChannel;
       
    41 import java.nio.channels.SocketChannel;
       
    42 import java.nio.channels.spi.SelectorProvider;
       
    43 import java.util.Iterator;
       
    44 import java.util.Set;
       
    45 import jdk.net.RdmaSockets;
       
    46 
       
    47 import jtreg.SkippedException;
       
    48 
       
    49 public class SelectorTest {
       
    50     private static List clientList = new LinkedList();
       
    51     private static Random rnd = new Random();
       
    52     public static int NUM_CLIENTS = 5;
       
    53     public static int TEST_PORT = 31452;
       
    54     static PrintStream log = System.err;
       
    55     private static int FINISH_TIME = 30000;
       
    56 
       
    57     /*
       
    58      * Usage note
       
    59      *
       
    60      * java SelectorTest [server] [client <host>] [<port>]
       
    61      *
       
    62      * No arguments runs both client and server in separate threads
       
    63      * using the default port of 31452.
       
    64      *
       
    65      * client runs the client on this machine and connects to server
       
    66      * at the given IP address.
       
    67      *
       
    68      * server runs the server on localhost.
       
    69      */
       
    70     public static void main(String[] args) throws Exception {
       
    71         if (!RsocketTest.isRsocketAvailable())
       
    72             throw new SkippedException("rsocket is not available");
       
    73 
       
    74         if (args.length == 0) {
       
    75             Server server = new Server(0);
       
    76             server.start();
       
    77             try {
       
    78                 Thread.sleep(1000);
       
    79             } catch (InterruptedException e) { }
       
    80             InetSocketAddress isa
       
    81                 = new InetSocketAddress(InetAddress.getLocalHost(), server.port());
       
    82             Client client = new Client(isa);
       
    83             client.start();
       
    84             if ((server.finish(FINISH_TIME) & client.finish(FINISH_TIME)) == 0)
       
    85                 throw new Exception("Failure");
       
    86             log.println();
       
    87 
       
    88         } else if (args[0].equals("server")) {
       
    89 
       
    90             if (args.length > 1)
       
    91                 TEST_PORT = Integer.parseInt(args[1]);
       
    92             Server server = new Server(TEST_PORT);
       
    93             server.start();
       
    94             if (server.finish(FINISH_TIME) == 0)
       
    95                 throw new Exception("Failure");
       
    96             log.println();
       
    97 
       
    98         } else if (args[0].equals("client")) {
       
    99 
       
   100             if (args.length < 2) {
       
   101                 log.println("No host specified: terminating.");
       
   102                 return;
       
   103             }
       
   104             String ip = args[1];
       
   105             if (args.length > 2)
       
   106                 TEST_PORT = Integer.parseInt(args[2]);
       
   107             InetAddress ia = InetAddress.getByName(ip);
       
   108             InetSocketAddress isa = new InetSocketAddress(ia, TEST_PORT);
       
   109             Client client = new Client(isa);
       
   110             client.start();
       
   111             if (client.finish(FINISH_TIME) == 0)
       
   112                 throw new Exception("Failure");
       
   113             log.println();
       
   114 
       
   115         } else {
       
   116             System.out.println("Usage note:");
       
   117             System.out.println("java SelectorTest [server] [client <host>] [<port>]");
       
   118             System.out.println("No arguments runs both client and server in separate threads using the default port of 31452.");
       
   119             System.out.println("client runs the client on this machine and connects to the server specified.");
       
   120             System.out.println("server runs the server on localhost.");
       
   121         }
       
   122     }
       
   123 
       
   124     static class Client extends TestThread {
       
   125         InetSocketAddress isa;
       
   126         Client(InetSocketAddress isa) {
       
   127             super("Client", SelectorTest.log);
       
   128             this.isa = isa;
       
   129         }
       
   130 
       
   131         public void go() throws Exception {
       
   132             log.println("starting client...");
       
   133             for (int i=0; i<NUM_CLIENTS; i++)
       
   134                 clientList.add(new RemoteEntity(i, isa, log));
       
   135 
       
   136             Collections.shuffle(clientList);
       
   137 
       
   138             log.println("created "+NUM_CLIENTS+" clients");
       
   139             do {
       
   140                 for (Iterator i = clientList.iterator(); i.hasNext(); ) {
       
   141                     RemoteEntity re = (RemoteEntity) i.next();
       
   142                     if (re.cycle()) {
       
   143                         i.remove();
       
   144                     }
       
   145                 }
       
   146                 Collections.shuffle(clientList);
       
   147             } while (clientList.size() > 0);
       
   148         }
       
   149     }
       
   150 
       
   151     static class Server extends TestThread {
       
   152         private final ServerSocketChannel ssc;
       
   153         private List socketList = new ArrayList();
       
   154         private ServerSocket ss;
       
   155         private int connectionsAccepted = 0;
       
   156         private Selector pollSelector;
       
   157         private Selector acceptSelector;
       
   158         private Set pkeys;
       
   159         private Set pskeys;
       
   160 
       
   161         Server(int port) throws IOException {
       
   162             super("Server", SelectorTest.log);
       
   163             this.ssc = RdmaSockets.openServerSocketChannel(
       
   164                 StandardProtocolFamily.INET);
       
   165             ssc.bind(new InetSocketAddress(InetAddress.getLocalHost(), port));
       
   166         }
       
   167 
       
   168         int port() {
       
   169             return ssc.socket().getLocalPort();
       
   170         }
       
   171 
       
   172         public void go() throws Exception {
       
   173             log.println("starting server...");
       
   174             acceptSelector = RdmaSockets.openSelector();
       
   175             pollSelector = RdmaSockets.openSelector();
       
   176             pkeys = pollSelector.keys();
       
   177             pskeys = pollSelector.selectedKeys();
       
   178             Set readyKeys = acceptSelector.selectedKeys();
       
   179             RequestHandler rh = new RequestHandler(pollSelector, log);
       
   180             Thread requestThread = new Thread(rh);
       
   181 
       
   182             requestThread.start();
       
   183 
       
   184             ssc.configureBlocking(false);
       
   185             SelectionKey acceptKey = ssc.register(acceptSelector,
       
   186                                                   SelectionKey.OP_ACCEPT);
       
   187             while(connectionsAccepted < SelectorTest.NUM_CLIENTS) {
       
   188                 int keysAdded = acceptSelector.select(100);
       
   189                 if (keysAdded > 0) {
       
   190                     Iterator i = readyKeys.iterator();
       
   191                     while(i.hasNext()) {
       
   192                         SelectionKey sk = (SelectionKey)i.next();
       
   193                         i.remove();
       
   194                         ServerSocketChannel nextReady =
       
   195                             (ServerSocketChannel)sk.channel();
       
   196                         SocketChannel sc = nextReady.accept();
       
   197                         connectionsAccepted++;
       
   198                         if (sc != null) {
       
   199                             sc.configureBlocking(false);
       
   200                             synchronized (pkeys) {
       
   201                                sc.register(pollSelector, SelectionKey.OP_READ);
       
   202                             }
       
   203                         } else {
       
   204                             throw new RuntimeException(
       
   205                                 "Socket does not support Channels");
       
   206                         }
       
   207                     }
       
   208                 }
       
   209             }
       
   210             acceptKey.cancel();
       
   211             requestThread.join();
       
   212             acceptSelector.close();
       
   213             pollSelector.close();
       
   214         }
       
   215     }
       
   216 }
       
   217 
       
   218 class RemoteEntity {
       
   219     private static Random rnd = new Random();
       
   220     int id;
       
   221     ByteBuffer data;
       
   222     int dataWrittenIndex;
       
   223     int totalDataLength;
       
   224     boolean initiated = false;
       
   225     boolean connected = false;
       
   226     boolean written = false;
       
   227     boolean acked = false;
       
   228     boolean closed = false;
       
   229     private SocketChannel sc;
       
   230     ByteBuffer ackBuffer;
       
   231     PrintStream log;
       
   232     InetSocketAddress server;
       
   233 
       
   234     RemoteEntity(int id, InetSocketAddress server, PrintStream log)
       
   235         throws Exception
       
   236     {
       
   237         int connectFailures = 0;
       
   238         this.id = id;
       
   239         this.log = log;
       
   240         this.server = server;
       
   241 
       
   242         sc = RdmaSockets.openSocketChannel(StandardProtocolFamily.INET);
       
   243         sc.configureBlocking(false);
       
   244 
       
   245         // Prepare the data buffer to write out from this entity
       
   246         // Let's use both slow and fast buffers
       
   247         if (rnd.nextBoolean())
       
   248             data = ByteBuffer.allocateDirect(100);
       
   249         else
       
   250             data = ByteBuffer.allocate(100);
       
   251         String number = Integer.toString(id);
       
   252         if (number.length() == 1)
       
   253             number = "0"+number;
       
   254         String source = "Testing from " + number;
       
   255         data.put(source.getBytes("8859_1"));
       
   256         data.flip();
       
   257         totalDataLength = source.length();
       
   258 
       
   259         // Allocate an ack buffer
       
   260         ackBuffer = ByteBuffer.allocateDirect(10);
       
   261     }
       
   262 
       
   263     private void reset() throws Exception {
       
   264         sc.close();
       
   265         sc = RdmaSockets.openSocketChannel(StandardProtocolFamily.INET);
       
   266         sc.configureBlocking(false);
       
   267     }
       
   268 
       
   269     private void connect() throws Exception {
       
   270         try {
       
   271             connected = sc.connect(server);
       
   272             initiated = true;
       
   273         }  catch (ConnectException e) {
       
   274             initiated = false;
       
   275             reset();
       
   276         }
       
   277     }
       
   278 
       
   279     private void finishConnect() throws Exception {
       
   280         try {
       
   281             connected = sc.finishConnect();
       
   282         }  catch (IOException e) {
       
   283             initiated = false;
       
   284             reset();
       
   285         }
       
   286     }
       
   287 
       
   288     int id() {
       
   289         return id;
       
   290     }
       
   291 
       
   292     boolean cycle() throws Exception {
       
   293         if (!initiated)
       
   294             connect();
       
   295         else if (!connected)
       
   296             finishConnect();
       
   297         else if (!written)
       
   298             writeCycle();
       
   299         else if (!acked)
       
   300             ackCycle();
       
   301         else if (!closed)
       
   302             close();
       
   303         return closed;
       
   304     }
       
   305 
       
   306     private void ackCycle() throws Exception {
       
   307         //log.println("acking from "+id);
       
   308         int bytesRead = sc.read(ackBuffer);
       
   309         if (bytesRead > 0) {
       
   310             acked = true;
       
   311         }
       
   312     }
       
   313 
       
   314     private void close() throws Exception {
       
   315         sc.close();
       
   316         closed = true;
       
   317     }
       
   318 
       
   319     private void writeCycle() throws Exception {
       
   320         log.println("writing from "+id);
       
   321         int numBytesToWrite = rnd.nextInt(10)+1;
       
   322         int newWriteTarget = dataWrittenIndex + numBytesToWrite;
       
   323         if (newWriteTarget > totalDataLength)
       
   324             newWriteTarget = totalDataLength;
       
   325         data.limit(newWriteTarget);
       
   326         int bytesWritten = sc.write(data);
       
   327         if (bytesWritten > 0)
       
   328             dataWrittenIndex += bytesWritten;
       
   329         if (dataWrittenIndex == totalDataLength) {
       
   330             written = true;
       
   331             sc.socket().shutdownOutput();
       
   332         }
       
   333     }
       
   334 
       
   335 }
       
   336 
       
   337 
       
   338 class RequestHandler implements Runnable {
       
   339     private static Random rnd = new Random();
       
   340     private Selector selector;
       
   341     private int connectionsHandled = 0;
       
   342     private HashMap dataBin = new HashMap();
       
   343     PrintStream log;
       
   344 
       
   345     public RequestHandler(Selector selector, PrintStream log) {
       
   346         this.selector = selector;
       
   347         this.log = log;
       
   348     }
       
   349 
       
   350     public void run() {
       
   351         log.println("starting request handler...");
       
   352         int connectionsAccepted = 0;
       
   353 
       
   354         Set nKeys = selector.keys();
       
   355         Set readyKeys = selector.selectedKeys();
       
   356 
       
   357         try {
       
   358             while(connectionsHandled < SelectorTest.NUM_CLIENTS) {
       
   359                 int numKeys = selector.select(100);
       
   360 
       
   361                 // Process channels with data
       
   362                 synchronized (nKeys) {
       
   363                     if (readyKeys.size() > 0) {
       
   364                         Iterator i = readyKeys.iterator();
       
   365                         while(i.hasNext()) {
       
   366                             SelectionKey sk = (SelectionKey)i.next();
       
   367                             i.remove();
       
   368                             SocketChannel sc = (SocketChannel)sk.channel();
       
   369                             if (sc.isOpen())
       
   370                                 read(sk, sc);
       
   371                         }
       
   372                     }
       
   373                 }
       
   374 
       
   375                 // Give other threads a chance to run
       
   376                 if (numKeys == 0) {
       
   377                     try {
       
   378                         Thread.sleep(1);
       
   379                     } catch (Exception x) {}
       
   380                 }
       
   381             }
       
   382         } catch (Exception e) {
       
   383             log.println("Unexpected error 1: "+e);
       
   384             e.printStackTrace();
       
   385         }
       
   386     }
       
   387 
       
   388     private void read(SelectionKey sk, SocketChannel sc) throws Exception {
       
   389         ByteBuffer bin = (ByteBuffer)dataBin.get(sc);
       
   390         if (bin == null) {
       
   391             if (rnd.nextBoolean())
       
   392                 bin = ByteBuffer.allocateDirect(100);
       
   393             else
       
   394                 bin = ByteBuffer.allocate(100);
       
   395             dataBin.put(sc, bin);
       
   396         }
       
   397 
       
   398         int bytesRead = 0;
       
   399         do {
       
   400             bytesRead = sc.read(bin);
       
   401         } while(bytesRead > 0);
       
   402 
       
   403         if (bytesRead == -1) {
       
   404             sk.interestOps(0);
       
   405             bin.flip();
       
   406             int size = bin.limit();
       
   407             byte[] data = new byte[size];
       
   408             for(int j=0; j<size; j++)
       
   409                 data[j] = bin.get();
       
   410             String message = new String(data, "8859_1");
       
   411             connectionsHandled++;
       
   412             acknowledge(sc);
       
   413             log.println("Received >>>"+message + "<<<");
       
   414             log.println("Handled: "+connectionsHandled);
       
   415         }
       
   416     }
       
   417 
       
   418     private void acknowledge(SocketChannel sc) throws Exception {
       
   419             ByteBuffer ackBuffer = ByteBuffer.allocateDirect(10);
       
   420             String s = "ack";
       
   421             ackBuffer.put(s.getBytes("8859_1"));
       
   422             ackBuffer.flip();
       
   423             int bytesWritten = 0;
       
   424             while(bytesWritten == 0) {
       
   425                 bytesWritten += sc.write(ackBuffer);
       
   426             }
       
   427             sc.close();
       
   428     }
       
   429 }