test/jdk/jdk/net/RdmaSockets/rsocket/SocketChannel/VectorIO.java
branchrsocket-branch
changeset 57115 512e7cc6ccce
child 57156 81e4a12fd1a4
equal deleted inserted replaced
53485:b743968ad646 57115:512e7cc6ccce
       
     1 /*
       
     2  * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.
       
     8  *
       
     9  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    12  * version 2 for more details (a copy is included in the LICENSE file that
       
    13  * accompanied this code).
       
    14  *
       
    15  * You should have received a copy of the GNU General Public License version
       
    16  * 2 along with this work; if not, write to the Free Software Foundation,
       
    17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    18  *
       
    19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    20  * or visit www.oracle.com if you need additional information or have any
       
    21  * questions.
       
    22  */
       
    23 
       
    24 /* @test
       
    25  * @bug 8195160
       
    26  * @summary Test socketchannel vector IO (use -Dseed=X to set PRNG seed)
       
    27  * @requires (os.family == "linux")
       
    28  * @library .. /test/lib /test/jdk/java/nio/channels
       
    29  * @build jdk.test.lib.RandomFactory
       
    30  * @build RsocketTest
       
    31  * @run main/othervm VectorIO
       
    32  * @key randomness
       
    33  */
       
    34 
       
    35 import java.io.IOException;
       
    36 import java.net.InetAddress;
       
    37 import java.net.InetSocketAddress;
       
    38 import java.net.StandardProtocolFamily;
       
    39 import java.nio.ByteBuffer;
       
    40 import java.nio.channels.ServerSocketChannel;
       
    41 import java.nio.channels.SocketChannel;
       
    42 import java.util.Random;
       
    43 import jdk.test.lib.RandomFactory;
       
    44 import jdk.net.RdmaSockets;
       
    45 
       
    46 import jtreg.SkippedException;
       
    47 
       
    48 public class VectorIO {
       
    49 
       
    50     private static Random generator = RandomFactory.getRandom();
       
    51 
       
    52     static int testSize;
       
    53 
       
    54     // whether to use the write/read variant with a length parameter
       
    55     static boolean setLength;
       
    56 
       
    57     public static void main(String[] args) throws Exception {
       
    58         if (!RsocketTest.isRsocketAvailable())
       
    59             throw new SkippedException("rsocket is not available");
       
    60 
       
    61         testSize = 1;
       
    62         setLength = false;
       
    63         runTest();
       
    64         for(int i = 15; i < 18; i++) {
       
    65             testSize = i;
       
    66             setLength = !setLength;
       
    67             runTest();
       
    68         }
       
    69     }
       
    70 
       
    71     static void runTest() throws Exception {
       
    72         System.err.println("Length " + testSize);
       
    73         Server sv = new Server(testSize);
       
    74         sv.start();
       
    75         bufferTest(sv.port());
       
    76         if (sv.finish(8000) == 0)
       
    77             throw new Exception("Failed: Length = " + testSize);
       
    78     }
       
    79 
       
    80     static void bufferTest(int port) throws Exception {
       
    81         ByteBuffer[] bufs = new ByteBuffer[testSize];
       
    82         long total = 0L;
       
    83         for(int i = 0; i < testSize; i++) {
       
    84             String source = "buffer" + i;
       
    85             if (generator.nextBoolean())
       
    86                 bufs[i] = ByteBuffer.allocateDirect(source.length());
       
    87             else
       
    88                 bufs[i] = ByteBuffer.allocate(source.length());
       
    89 
       
    90             bufs[i].put(source.getBytes("8859_1"));
       
    91             bufs[i].flip();
       
    92             total += bufs[i].remaining();
       
    93         }
       
    94 
       
    95         ByteBuffer[] bufsPlus1 = new ByteBuffer[bufs.length + 1];
       
    96         System.arraycopy(bufs, 0, bufsPlus1, 0, bufs.length);
       
    97 
       
    98         // Get a connection to the server
       
    99         InetAddress lh = InetAddress.getLocalHost();
       
   100         InetSocketAddress isa = new InetSocketAddress(lh, port);
       
   101         SocketChannel sc = RdmaSockets.openSocketChannel(StandardProtocolFamily.INET);
       
   102         sc.connect(isa);
       
   103         sc.configureBlocking(generator.nextBoolean());
       
   104 
       
   105         // Write the data out
       
   106         long rem = total;
       
   107         while (rem > 0L) {
       
   108             long bytesWritten;
       
   109             if (setLength) {
       
   110                 bytesWritten = sc.write(bufsPlus1, 0, bufs.length);
       
   111             } else {
       
   112                 bytesWritten = sc.write(bufs);
       
   113             }
       
   114             if (bytesWritten == 0) {
       
   115                 if (sc.isBlocking()) {
       
   116                     throw new RuntimeException("write did not block");
       
   117                 } else {
       
   118                     System.err.println("Non-blocking write() wrote zero bytes");
       
   119                 }
       
   120                 Thread.sleep(50);
       
   121             } else {
       
   122                 rem -= bytesWritten;
       
   123             }
       
   124         }
       
   125 
       
   126         // Clean up
       
   127         sc.close();
       
   128     }
       
   129 
       
   130     static class Server
       
   131         extends TestThread
       
   132     {
       
   133         final int testSize;
       
   134         final ServerSocketChannel ssc;
       
   135 
       
   136         Server(int testSize) throws IOException {
       
   137             super("Server " + testSize);
       
   138             this.testSize = testSize;
       
   139             this.ssc = RdmaSockets.openServerSocketChannel(
       
   140                 StandardProtocolFamily.INET).bind(new InetSocketAddress(0));
       
   141         }
       
   142 
       
   143         int port() {
       
   144             return ssc.socket().getLocalPort();
       
   145         }
       
   146 
       
   147         void go() throws Exception {
       
   148             bufferTest();
       
   149         }
       
   150 
       
   151         void bufferTest() throws Exception {
       
   152             long total = 0L;
       
   153             ByteBuffer[] bufs = new ByteBuffer[testSize];
       
   154             for(int i=0; i<testSize; i++) {
       
   155                 String source = "buffer" + i;
       
   156                 if (generator.nextBoolean())
       
   157                     bufs[i] = ByteBuffer.allocateDirect(source.length());
       
   158                 else
       
   159                     bufs[i] = ByteBuffer.allocate(source.length());
       
   160                 total += bufs[i].capacity();
       
   161             }
       
   162 
       
   163             ByteBuffer[] bufsPlus1 = new ByteBuffer[bufs.length + 1];
       
   164             System.arraycopy(bufs, 0, bufsPlus1, 0, bufs.length);
       
   165 
       
   166             // Get a connection from client
       
   167             SocketChannel sc = null;
       
   168 
       
   169             try {
       
   170 
       
   171                 ssc.configureBlocking(false);
       
   172 
       
   173                 for (;;) {
       
   174                     sc = ssc.accept();
       
   175                     if (sc != null) {
       
   176                         System.err.println("accept() succeeded");
       
   177                         break;
       
   178                     }
       
   179                     Thread.sleep(50);
       
   180                 }
       
   181 
       
   182                 sc.configureBlocking(generator.nextBoolean());
       
   183 
       
   184                 // Read data into multiple buffers
       
   185                 long avail = total;
       
   186                 while (avail > 0) {
       
   187                     long bytesRead;
       
   188                     if (setLength) {
       
   189                         bytesRead = sc.read(bufsPlus1, 0, bufs.length);
       
   190                     } else {
       
   191                         bytesRead = sc.read(bufs);
       
   192                     }
       
   193                     if (bytesRead < 0)
       
   194                         break;
       
   195                     if (bytesRead == 0) {
       
   196                         if (sc.isBlocking()) {
       
   197                             throw new RuntimeException("read did not block");
       
   198                         } else {
       
   199                             System.err.println
       
   200                                 ("Non-blocking read() read zero bytes");
       
   201                         }
       
   202                         Thread.sleep(50);
       
   203                     }
       
   204                     avail -= bytesRead;
       
   205                 }
       
   206 
       
   207                 // Check results
       
   208                 for(int i=0; i<testSize; i++) {
       
   209                     String expected = "buffer" + i;
       
   210                     bufs[i].flip();
       
   211                     int size = bufs[i].capacity();
       
   212                     byte[] data = new byte[size];
       
   213                     for(int j=0; j<size; j++)
       
   214                         data[j] = bufs[i].get();
       
   215                     String message = new String(data, "8859_1");
       
   216                     if (!message.equals(expected))
       
   217                         throw new Exception("Wrong data: Got "
       
   218                                             + message + ", expected "
       
   219                                             + expected);
       
   220                 }
       
   221 
       
   222             } finally {
       
   223                 // Clean up
       
   224                 ssc.close();
       
   225                 if (sc != null)
       
   226                     sc.close();
       
   227             }
       
   228 
       
   229         }
       
   230 
       
   231     }
       
   232 }