test/jdk/java/nio/channels/AsynchronousSocketChannel/Basic.java
changeset 49840 799e6e42b95f
parent 47216 71c04702a3d5
child 50303 7164c3bb55df
equal deleted inserted replaced
49839:dd5db907ab7e 49840:799e6e42b95f
     1 /*
     1 /*
     2  * Copyright (c) 2008, 2017, Oracle and/or its affiliates. All rights reserved.
     2  * Copyright (c) 2008, 2018, Oracle and/or its affiliates. All rights reserved.
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     4  *
     4  *
     5  * This code is free software; you can redistribute it and/or modify it
     5  * This code is free software; you can redistribute it and/or modify it
     6  * under the terms of the GNU General Public License version 2 only, as
     6  * under the terms of the GNU General Public License version 2 only, as
     7  * published by the Free Software Foundation.
     7  * published by the Free Software Foundation.
    23 
    23 
    24 /* @test
    24 /* @test
    25  * @bug 4607272 6842687 6878369 6944810 7023403
    25  * @bug 4607272 6842687 6878369 6944810 7023403
    26  * @summary Unit test for AsynchronousSocketChannel(use -Dseed=X to set PRNG seed)
    26  * @summary Unit test for AsynchronousSocketChannel(use -Dseed=X to set PRNG seed)
    27  * @library /test/lib
    27  * @library /test/lib
    28  * @build jdk.test.lib.RandomFactory
    28  * @build jdk.test.lib.RandomFactory jdk.test.lib.Utils
    29  * @run main Basic -skipSlowConnectTest
    29  * @run main/othervm/timeout=600 Basic -skipSlowConnectTest
    30  * @key randomness intermittent
    30  * @key randomness intermittent
    31  */
    31  */
    32 
    32 
    33 import java.io.Closeable;
    33 import java.io.Closeable;
    34 import java.io.IOException;
    34 import java.io.IOException;
    77     static class Server implements Closeable {
    77     static class Server implements Closeable {
    78         private final ServerSocketChannel ssc;
    78         private final ServerSocketChannel ssc;
    79         private final InetSocketAddress address;
    79         private final InetSocketAddress address;
    80 
    80 
    81         Server() throws IOException {
    81         Server() throws IOException {
    82             ssc = ServerSocketChannel.open().bind(new InetSocketAddress(0));
    82             this(0);
    83 
    83         }
    84             InetAddress lh = InetAddress.getLocalHost();
    84 
    85             int port = ((InetSocketAddress)(ssc.getLocalAddress())).getPort();
    85         Server(int recvBufSize) throws IOException {
    86             address = new InetSocketAddress(lh, port);
    86             ssc = ServerSocketChannel.open();
       
    87             if (recvBufSize > 0) {
       
    88                 ssc.setOption(SO_RCVBUF, recvBufSize);
       
    89             }
       
    90             ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
       
    91             address = (InetSocketAddress)ssc.getLocalAddress();
    87         }
    92         }
    88 
    93 
    89         InetSocketAddress address() {
    94         InetSocketAddress address() {
    90             return address;
    95             return address;
    91         }
    96         }
   291             // expected
   296             // expected
   292         }
   297         }
   293 
   298 
   294         System.out.println("-- asynchronous close when reading --");
   299         System.out.println("-- asynchronous close when reading --");
   295 
   300 
   296         try (Server server = new Server()) {
   301         try (Server server = new Server(1)) {
   297             ch = AsynchronousSocketChannel.open();
   302             ch = AsynchronousSocketChannel.open();
   298             ch.connect(server.address()).get();
   303             ch.connect(server.address()).get();
   299 
   304 
   300             ByteBuffer dst = ByteBuffer.allocateDirect(100);
   305             ByteBuffer dst = ByteBuffer.allocateDirect(100);
   301             Future<Integer> result = ch.read(dst);
   306             Future<Integer> result = ch.read(dst);
   323 
   328 
   324             System.out.println("-- asynchronous close when writing --");
   329             System.out.println("-- asynchronous close when writing --");
   325 
   330 
   326             ch = AsynchronousSocketChannel.open();
   331             ch = AsynchronousSocketChannel.open();
   327             ch.connect(server.address()).get();
   332             ch.connect(server.address()).get();
       
   333             SocketChannel peer = server.accept();
       
   334             peer.setOption(SO_RCVBUF, 1);
   328 
   335 
   329             final AtomicReference<Throwable> writeException =
   336             final AtomicReference<Throwable> writeException =
   330                 new AtomicReference<Throwable>();
   337                 new AtomicReference<Throwable>();
   331 
   338 
   332             // write bytes to fill socket buffer
   339             // write bytes to fill socket buffer
   333             final AtomicInteger numCompleted = new AtomicInteger();
   340             final AtomicInteger numCompleted = new AtomicInteger();
   334             ch.write(genBuffer(), ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() {
   341             ch.write(genBuffer(), ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() {
   335                 public void completed(Integer result, AsynchronousSocketChannel ch) {
   342                 public void completed(Integer result, AsynchronousSocketChannel ch) {
       
   343                     System.out.println("completed write to async channel: " + result);
   336                     numCompleted.incrementAndGet();
   344                     numCompleted.incrementAndGet();
   337                     ch.write(genBuffer(), ch, this);
   345                     ch.write(genBuffer(), ch, this);
       
   346                     System.out.println("started another write to async channel: " + result);
   338                 }
   347                 }
   339                 public void failed(Throwable x, AsynchronousSocketChannel ch) {
   348                 public void failed(Throwable x, AsynchronousSocketChannel ch) {
       
   349                     System.out.println("failed write to async channel");
   340                     writeException.set(x);
   350                     writeException.set(x);
   341                 }
   351                 }
   342             });
   352             });
   343 
   353 
   344             // give time for socket buffer to fill up -
   354             // give time for socket buffer to fill up -
   345             // take pauses until the handler is no longer being invoked
   355             // take pauses until the handler is no longer being invoked
   346             // because all writes are being pended which guarantees that
   356             // because all writes are being pended which guarantees that
   347             // the internal channel state indicates it is writing
   357             // the internal channel state indicates it is writing
   348             int prevNumCompleted = numCompleted.get();
   358             int prevNumCompleted = numCompleted.get();
   349             do {
   359             do {
   350                 Thread.sleep(1000);
   360                 Thread.sleep((long)(1000 * jdk.test.lib.Utils.TIMEOUT_FACTOR));
       
   361                 System.out.println("check if buffer is filled up");
   351                 if (numCompleted.get() == prevNumCompleted) {
   362                 if (numCompleted.get() == prevNumCompleted) {
   352                     break;
   363                     break;
   353                 }
   364                 }
   354                 prevNumCompleted = numCompleted.get();
   365                 prevNumCompleted = numCompleted.get();
   355             } while (true);
   366             } while (true);
   356 
   367 
   357             // attempt a concurrent write -
   368             // attempt a concurrent write -
   358             // should fail with WritePendingException
   369             // should fail with WritePendingException
   359             try {
   370             try {
       
   371                 System.out.println("concurrent write to async channel");
   360                 ch.write(genBuffer());
   372                 ch.write(genBuffer());
       
   373                 System.out.format("prevNumCompleted: %d, numCompleted: %d%n",
       
   374                                   prevNumCompleted, numCompleted.get());
   361                 throw new RuntimeException("WritePendingException expected");
   375                 throw new RuntimeException("WritePendingException expected");
   362             } catch (WritePendingException x) {
   376             } catch (WritePendingException x) {
   363             }
   377             }
   364 
   378 
   365             // close channel - should cause initial write to complete
   379             // close channel - should cause initial write to complete
       
   380             System.out.println("closing async channel...");
   366             ch.close();
   381             ch.close();
   367             server.accept().close();
   382             System.out.println("closed async channel");
       
   383             peer.close();
   368 
   384 
   369             // wait for exception
   385             // wait for exception
   370             while (writeException.get() == null) {
   386             while (writeException.get() == null) {
   371                 Thread.sleep(100);
   387                 Thread.sleep(100);
   372             }
   388             }