test/jdk/java/nio/channels/SocketChannel/AdaptorStreams.java
changeset 54620 13b67c1420b8
child 54902 d1717e05e51c
equal deleted inserted replaced
54619:b43cc3b9ef40 54620:13b67c1420b8
       
     1 /*
       
     2  * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.
       
     8  *
       
     9  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    12  * version 2 for more details (a copy is included in the LICENSE file that
       
    13  * accompanied this code).
       
    14  *
       
    15  * You should have received a copy of the GNU General Public License version
       
    16  * 2 along with this work; if not, write to the Free Software Foundation,
       
    17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    18  *
       
    19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    20  * or visit www.oracle.com if you need additional information or have any
       
    21  * questions.
       
    22  */
       
    23 
       
    24 /* @test
       
    25  * @bug 8222774 4430139
       
    26  * @run testng AdaptorStreams
       
    27  * @summary Exercise socket adaptor input/output streams
       
    28  */
       
    29 
       
    30 import java.io.Closeable;
       
    31 import java.io.IOException;
       
    32 import java.io.InputStream;
       
    33 import java.io.OutputStream;
       
    34 import java.net.ServerSocket;
       
    35 import java.net.Socket;
       
    36 import java.net.SocketTimeoutException;
       
    37 import java.nio.channels.IllegalBlockingModeException;
       
    38 import java.nio.channels.SocketChannel;
       
    39 import java.util.concurrent.ExecutorService;
       
    40 import java.util.concurrent.Executors;
       
    41 import java.util.concurrent.Future;
       
    42 import java.util.concurrent.ScheduledExecutorService;
       
    43 import java.util.concurrent.TimeUnit;
       
    44 
       
    45 import org.testng.annotations.Test;
       
    46 import static org.testng.Assert.*;
       
    47 
       
    48 @Test
       
    49 public class AdaptorStreams {
       
    50 
       
    51     /**
       
    52      * Test read when bytes are available
       
    53      */
       
    54     public void testRead1() throws Exception {
       
    55         withConnection((sc, peer) -> {
       
    56             peer.getOutputStream().write(99);
       
    57             int n = sc.socket().getInputStream().read();
       
    58             assertTrue(n == 99);
       
    59         });
       
    60     }
       
    61 
       
    62     /**
       
    63      * Test read blocking before bytes are available
       
    64      */
       
    65     public void testRead2() throws Exception {
       
    66         withConnection((sc, peer) -> {
       
    67             scheduleWrite(peer.getOutputStream(), 99, 1000);
       
    68             int n = sc.socket().getInputStream().read();
       
    69             assertTrue(n == 99);
       
    70         });
       
    71     }
       
    72 
       
    73     /**
       
    74      * Test read when peer has closed connection
       
    75      */
       
    76     public void testRead3() throws Exception {
       
    77         withConnection((sc, peer) -> {
       
    78             peer.close();
       
    79             int n = sc.socket().getInputStream().read();
       
    80             assertTrue(n == -1);
       
    81         });
       
    82     }
       
    83 
       
    84     /**
       
    85      * Test read blocking before peer closes connection
       
    86      */
       
    87     public void testRead4() throws Exception {
       
    88         withConnection((sc, peer) -> {
       
    89             scheduleClose(peer, 1000);
       
    90             int n = sc.socket().getInputStream().read();
       
    91             assertTrue(n == -1);
       
    92         });
       
    93     }
       
    94 
       
    95     /**
       
    96      * Test async close of socket when thread blocked in read
       
    97      */
       
    98     public void testRead5() throws Exception {
       
    99         withConnection((sc, peer) -> {
       
   100             scheduleClose(sc, 2000);
       
   101             InputStream in = sc.socket().getInputStream();
       
   102             expectThrows(IOException.class, () -> in.read());
       
   103         });
       
   104     }
       
   105 
       
   106     /**
       
   107      * Test interrupt status set before read
       
   108      */
       
   109     public void testRead6() throws Exception {
       
   110         withConnection((sc, peer) -> {
       
   111             Socket s = sc.socket();
       
   112             Thread.currentThread().interrupt();
       
   113             try {
       
   114                 InputStream in = s.getInputStream();
       
   115                 expectThrows(IOException.class, () -> in.read());
       
   116             } finally {
       
   117                 Thread.interrupted();  // clear interrupt
       
   118             }
       
   119             assertTrue(s.isClosed());
       
   120         });
       
   121     }
       
   122 
       
   123     /**
       
   124      * Test interrupt of thread blocked in read
       
   125      */
       
   126     public void testRead7() throws Exception {
       
   127         withConnection((sc, peer) -> {
       
   128             Future<?> interrupter = scheduleInterrupt(Thread.currentThread(), 2000);
       
   129             Socket s = sc.socket();
       
   130             try {
       
   131                 InputStream in = s.getInputStream();
       
   132                 expectThrows(IOException.class, () -> in.read());
       
   133             } finally {
       
   134                 interrupter.cancel(true);
       
   135                 Thread.interrupted();  // clear interrupt
       
   136             }
       
   137             assertTrue(s.isClosed());
       
   138         });
       
   139     }
       
   140 
       
   141     /**
       
   142      * Test read when channel is configured non-blocking
       
   143      */
       
   144     public void testRead8() throws Exception {
       
   145         withConnection((sc, peer) -> {
       
   146             sc.configureBlocking(false);
       
   147             InputStream in = sc.socket().getInputStream();
       
   148             expectThrows(IllegalBlockingModeException.class, () -> in.read());
       
   149         });
       
   150     }
       
   151 
       
   152     /**
       
   153      * Test timed read when bytes are available
       
   154      */
       
   155     public void testTimedRead1() throws Exception {
       
   156         withConnection((sc, peer) -> {
       
   157             peer.getOutputStream().write(99);
       
   158             Socket s = sc.socket();
       
   159             s.setSoTimeout(1000);
       
   160             int n = s.getInputStream().read();
       
   161             assertTrue(n == 99);
       
   162         });
       
   163     }
       
   164 
       
   165     /**
       
   166      * Test timed read blocking before bytes are available
       
   167      */
       
   168     public void testTimedRead2() throws Exception {
       
   169         withConnection((sc, peer) -> {
       
   170             scheduleWrite(peer.getOutputStream(), 99, 1000);
       
   171             Socket s = sc.socket();
       
   172             s.setSoTimeout(5000);
       
   173             int n = s.getInputStream().read();
       
   174             assertTrue(n == 99);
       
   175         });
       
   176     }
       
   177 
       
   178     /**
       
   179      * Test timed read when the read times out
       
   180      */
       
   181     public void testTimedRead3() throws Exception {
       
   182         withConnection((sc, peer) -> {
       
   183             Socket s = sc.socket();
       
   184             s.setSoTimeout(1000);
       
   185             InputStream in = s.getInputStream();
       
   186             expectThrows(SocketTimeoutException.class, () -> in.read());
       
   187         });
       
   188     }
       
   189 
       
   190     /**
       
   191      * Test async close of socket when thread blocked in timed read
       
   192      */
       
   193     public void testTimedRead4() throws Exception {
       
   194         withConnection((sc, peer) -> {
       
   195             scheduleClose(sc, 2000);
       
   196             Socket s = sc.socket();
       
   197             s.setSoTimeout(60*1000);
       
   198             InputStream in = s.getInputStream();
       
   199             expectThrows(IOException.class, () -> in.read());
       
   200         });
       
   201     }
       
   202 
       
   203     /**
       
   204      * Test interrupt status set before timed read
       
   205      */
       
   206     public void testTimedRead5() throws Exception {
       
   207         withConnection((sc, peer) -> {
       
   208             Socket s = sc.socket();
       
   209             Thread.currentThread().interrupt();
       
   210             try {
       
   211                 s.setSoTimeout(60*1000);
       
   212                 InputStream in = s.getInputStream();
       
   213                 expectThrows(IOException.class, () -> in.read());
       
   214             } finally {
       
   215                 Thread.interrupted();  // clear interrupt
       
   216             }
       
   217             assertTrue(s.isClosed());
       
   218         });
       
   219     }
       
   220 
       
   221     /**
       
   222      * Test interrupt of thread blocked in timed read
       
   223      */
       
   224     public void testTimedRead6() throws Exception {
       
   225         withConnection((sc, peer) -> {
       
   226             Future<?> interrupter = scheduleInterrupt(Thread.currentThread(), 2000);
       
   227             Socket s = sc.socket();
       
   228             try {
       
   229                 s.setSoTimeout(60*1000);
       
   230                 InputStream in = s.getInputStream();
       
   231                 expectThrows(IOException.class, () -> in.read());
       
   232                 assertTrue(s.isClosed());
       
   233             } finally {
       
   234                 interrupter.cancel(true);
       
   235                 Thread.interrupted();  // clear interrupt
       
   236             }
       
   237             assertTrue(s.isClosed());
       
   238         });
       
   239     }
       
   240 
       
   241     /**
       
   242      * Test async close of socket when thread blocked in write
       
   243      */
       
   244     public void testWrite1() throws Exception {
       
   245         withConnection((sc, peer) -> {
       
   246             scheduleClose(sc, 2000);
       
   247             expectThrows(IOException.class, () -> {
       
   248                 OutputStream out = sc.socket().getOutputStream();
       
   249                 byte[] data = new byte[64*1000];
       
   250                 while (true) {
       
   251                     out.write(data);
       
   252                 }
       
   253             });
       
   254         });
       
   255     }
       
   256 
       
   257     /**
       
   258      * Test interrupt status set before write
       
   259      */
       
   260     public void testWrite2() throws Exception {
       
   261         withConnection((sc, peer) -> {
       
   262             Socket s = sc.socket();
       
   263             Thread.currentThread().interrupt();
       
   264             try {
       
   265                 OutputStream out = s.getOutputStream();
       
   266                 expectThrows(IOException.class, () -> out.write(99));
       
   267             } finally {
       
   268                 Thread.interrupted();  // clear interrupt
       
   269             }
       
   270             assertTrue(s.isClosed());
       
   271         });
       
   272     }
       
   273 
       
   274     /**
       
   275      * Test interrupt of thread blocked in write
       
   276      */
       
   277     public void testWrite3() throws Exception {
       
   278         withConnection((sc, peer) -> {
       
   279             Future<?> interrupter = scheduleInterrupt(Thread.currentThread(), 2000);
       
   280             Socket s = sc.socket();
       
   281             try {
       
   282                 expectThrows(IOException.class, () -> {
       
   283                     OutputStream out = sc.socket().getOutputStream();
       
   284                     byte[] data = new byte[64*1000];
       
   285                     while (true) {
       
   286                         out.write(data);
       
   287                     }
       
   288                 });
       
   289             } finally {
       
   290                 interrupter.cancel(true);
       
   291                 Thread.interrupted();  // clear interrupt
       
   292             }
       
   293             assertTrue(s.isClosed());
       
   294         });
       
   295     }
       
   296 
       
   297     /**
       
   298      * Test write when channel is configured non-blocking
       
   299      */
       
   300     public void testWrite4() throws Exception {
       
   301         withConnection((sc, peer) -> {
       
   302             sc.configureBlocking(false);
       
   303             OutputStream out = sc.socket().getOutputStream();
       
   304             expectThrows(IllegalBlockingModeException.class, () -> out.write(99));
       
   305         });
       
   306     }
       
   307 
       
   308     /**
       
   309      * Test read when there are bytes available and another thread is blocked
       
   310      * in write
       
   311      */
       
   312     public void testConcurrentReadWrite1() throws Exception {
       
   313         withConnection((sc, peer) -> {
       
   314             Socket s = sc.socket();
       
   315 
       
   316             // block thread in write
       
   317             execute(() -> {
       
   318                 var data = new byte[64*1024];
       
   319                 OutputStream out = s.getOutputStream();
       
   320                 for (;;) {
       
   321                     out.write(data);
       
   322                 }
       
   323             });
       
   324             Thread.sleep(1000); // give writer time to block
       
   325 
       
   326             // test read when bytes are available
       
   327             peer.getOutputStream().write(99);
       
   328             int n = s.getInputStream().read();
       
   329             assertTrue(n == 99);
       
   330         });
       
   331     }
       
   332 
       
   333     /**
       
   334      * Test read blocking when another thread is blocked in write
       
   335      */
       
   336     public void testConcurrentReadWrite2() throws Exception {
       
   337         withConnection((sc, peer) -> {
       
   338             Socket s = sc.socket();
       
   339 
       
   340             // block thread in write
       
   341             execute(() -> {
       
   342                 var data = new byte[64*1024];
       
   343                 OutputStream out = s.getOutputStream();
       
   344                 for (;;) {
       
   345                     out.write(data);
       
   346                 }
       
   347             });
       
   348             Thread.sleep(1000); // give writer time to block
       
   349 
       
   350             // test read blocking until bytes are available
       
   351             scheduleWrite(peer.getOutputStream(), 99, 500);
       
   352             int n = s.getInputStream().read();
       
   353             assertTrue(n == 99);
       
   354         });
       
   355     }
       
   356 
       
   357     /**
       
   358      * Test writing when another thread is blocked in read
       
   359      */
       
   360     public void testConcurrentReadWrite3() throws Exception {
       
   361         withConnection((sc, peer) -> {
       
   362             Socket s = sc.socket();
       
   363 
       
   364             // block thread in read
       
   365             execute(() -> {
       
   366                 s.getInputStream().read();
       
   367             });
       
   368             Thread.sleep(100); // give reader time to block
       
   369 
       
   370             // test write
       
   371             s.getOutputStream().write(99);
       
   372             int n = peer.getInputStream().read();
       
   373             assertTrue(n == 99);
       
   374         });
       
   375     }
       
   376 
       
   377     /**
       
   378      * Test timed read when there are bytes available and another thread is
       
   379      * blocked in write
       
   380      */
       
   381     public void testConcurrentTimedReadWrite1() throws Exception {
       
   382         withConnection((sc, peer) -> {
       
   383             Socket s = sc.socket();
       
   384 
       
   385             // block thread in write
       
   386             execute(() -> {
       
   387                 var data = new byte[64*1024];
       
   388                 OutputStream out = s.getOutputStream();
       
   389                 for (;;) {
       
   390                     out.write(data);
       
   391                 }
       
   392             });
       
   393             Thread.sleep(1000); // give writer time to block
       
   394 
       
   395             // test read when bytes are available
       
   396             peer.getOutputStream().write(99);
       
   397             s.setSoTimeout(60*1000);
       
   398             int n = s.getInputStream().read();
       
   399             assertTrue(n == 99);
       
   400         });
       
   401     }
       
   402 
       
   403     /**
       
   404      * Test timed read blocking when another thread is blocked in write
       
   405      */
       
   406     public void testConcurrentTimedReadWrite2() throws Exception {
       
   407         withConnection((sc, peer) -> {
       
   408             Socket s = sc.socket();
       
   409 
       
   410             // block thread in write
       
   411             execute(() -> {
       
   412                 var data = new byte[64*1024];
       
   413                 OutputStream out = s.getOutputStream();
       
   414                 for (;;) {
       
   415                     out.write(data);
       
   416                 }
       
   417             });
       
   418             Thread.sleep(1000); // give writer time to block
       
   419 
       
   420             // test read blocking until bytes are available
       
   421             scheduleWrite(peer.getOutputStream(), 99, 500);
       
   422             s.setSoTimeout(60*1000);
       
   423             int n = s.getInputStream().read();
       
   424             assertTrue(n == 99);
       
   425         });
       
   426     }
       
   427 
       
   428     /**
       
   429      * Test writing when another thread is blocked in read
       
   430      */
       
   431     public void testConcurrentTimedReadWrite3() throws Exception {
       
   432         withConnection((sc, peer) -> {
       
   433             Socket s = sc.socket();
       
   434 
       
   435             // block thread in read
       
   436             execute(() -> {
       
   437                 s.setSoTimeout(60*1000);
       
   438                 s.getInputStream().read();
       
   439             });
       
   440             Thread.sleep(100); // give reader time to block
       
   441 
       
   442             // test write
       
   443             s.getOutputStream().write(99);
       
   444             int n = peer.getInputStream().read();
       
   445             assertTrue(n == 99);
       
   446         });
       
   447     }
       
   448 
       
   449     // -- test infrastructure --
       
   450 
       
   451     interface ThrowingTask {
       
   452         void run() throws Exception;
       
   453     }
       
   454 
       
   455     interface ThrowingBiConsumer<T, U> {
       
   456         void accept(T t, U u) throws Exception;
       
   457     }
       
   458 
       
   459     /**
       
   460      * Invokes the consumer with a connected pair of socket channel and socket
       
   461      */
       
   462     static void withConnection(ThrowingBiConsumer<SocketChannel, Socket> consumer)
       
   463         throws Exception
       
   464     {
       
   465         try (ServerSocket ss = new ServerSocket(0);
       
   466              SocketChannel sc = SocketChannel.open(ss.getLocalSocketAddress());
       
   467              Socket peer = ss.accept()) {
       
   468             consumer.accept(sc, peer);
       
   469         }
       
   470     }
       
   471 
       
   472     static Future<?> scheduleWrite(OutputStream out, byte[] data, long delay) {
       
   473         return schedule(() -> {
       
   474             try {
       
   475                 out.write(data);
       
   476             } catch (IOException ioe) { }
       
   477         }, delay);
       
   478     }
       
   479 
       
   480     static Future<?> scheduleWrite(OutputStream out, int b, long delay) {
       
   481         return scheduleWrite(out, new byte[] { (byte)b }, delay);
       
   482     }
       
   483 
       
   484     static Future<?> scheduleClose(Closeable c, long delay) {
       
   485         return schedule(() -> {
       
   486             try {
       
   487                 c.close();
       
   488             } catch (IOException ioe) { }
       
   489         }, delay);
       
   490     }
       
   491 
       
   492     static Future<?> scheduleInterrupt(Thread t, long delay) {
       
   493         return schedule(() -> t.interrupt(), delay);
       
   494     }
       
   495 
       
   496     static Future<?> schedule(Runnable task, long delay) {
       
   497         ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
       
   498         try {
       
   499             return executor.schedule(task, delay, TimeUnit.MILLISECONDS);
       
   500         } finally {
       
   501             executor.shutdown();
       
   502         }
       
   503     }
       
   504 
       
   505     static Future<?> execute(ThrowingTask task) {
       
   506         ExecutorService pool = Executors.newFixedThreadPool(1);
       
   507         try {
       
   508             return pool.submit(() -> {
       
   509                 task.run();
       
   510                 return null;
       
   511             });
       
   512         } finally {
       
   513             pool.shutdown();
       
   514         }
       
   515     }
       
   516 }