test/jdk/java/nio/channels/Selector/SelectWithConsumer.java
changeset 50602 ed8de3d0cd28
child 51209 4630bb315ec0
equal deleted inserted replaced
50601:3fbae7b9ddb5 50602:ed8de3d0cd28
       
     1 /*
       
     2  * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.
       
     8  *
       
     9  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    12  * version 2 for more details (a copy is included in the LICENSE file that
       
    13  * accompanied this code).
       
    14  *
       
    15  * You should have received a copy of the GNU General Public License version
       
    16  * 2 along with this work; if not, write to the Free Software Foundation,
       
    17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    18  *
       
    19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    20  * or visit www.oracle.com if you need additional information or have any
       
    21  * questions.
       
    22  */
       
    23 
       
    24 /* @test
       
    25  * @bug 8199433
       
    26  * @run testng SelectWithConsumer
       
    27  * @summary Unit test for Selector select(Consumer), select(Consumer,long) and
       
    28  *          selectNow(Consumer)
       
    29  */
       
    30 
       
    31 import java.io.Closeable;
       
    32 import java.io.IOException;
       
    33 import java.net.InetSocketAddress;
       
    34 import java.nio.ByteBuffer;
       
    35 import java.nio.channels.ClosedSelectorException;
       
    36 import java.nio.channels.Pipe;
       
    37 import java.nio.channels.SelectionKey;
       
    38 import java.nio.channels.Selector;
       
    39 import java.nio.channels.ServerSocketChannel;
       
    40 import java.nio.channels.SocketChannel;
       
    41 import java.nio.channels.WritableByteChannel;
       
    42 import java.util.concurrent.Executors;
       
    43 import java.util.concurrent.ScheduledExecutorService;
       
    44 import java.util.concurrent.TimeUnit;
       
    45 import java.util.concurrent.atomic.AtomicInteger;
       
    46 import static java.util.concurrent.TimeUnit.*;
       
    47 
       
    48 import org.testng.annotations.AfterTest;
       
    49 import org.testng.annotations.Test;
       
    50 import static org.testng.Assert.*;
       
    51 
       
    52 @Test
       
    53 public class SelectWithConsumer {
       
    54 
       
    55     /**
       
    56      * Invoke the select methods that take an action and check that the
       
    57      * accumulated ready ops notified to the action matches the expected ops.
       
    58      */
       
    59     void testActionInvoked(SelectionKey key, int expectedOps) throws Exception {
       
    60         var callerThread = Thread.currentThread();
       
    61         var sel = key.selector();
       
    62         var interestOps = key.interestOps();
       
    63         var notifiedOps = new AtomicInteger();
       
    64 
       
    65         // select(Consumer)
       
    66         if (expectedOps == 0)
       
    67             sel.wakeup(); // ensure select does not block
       
    68         notifiedOps.set(0);
       
    69         int n = sel.select(k -> {
       
    70             assertTrue(Thread.currentThread() == callerThread);
       
    71             assertTrue(k == key);
       
    72             int readyOps = key.readyOps();
       
    73             assertTrue((readyOps & interestOps) != 0);
       
    74             assertTrue((readyOps & notifiedOps.get()) == 0);
       
    75             notifiedOps.set(notifiedOps.get() | readyOps);
       
    76         });
       
    77         assertTrue((n == 1) ^ (expectedOps == 0));
       
    78         assertTrue(notifiedOps.get() == expectedOps);
       
    79 
       
    80         // select(Consumer, timeout)
       
    81         notifiedOps.set(0);
       
    82         n = sel.select(k -> {
       
    83             assertTrue(Thread.currentThread() == callerThread);
       
    84             assertTrue(k == key);
       
    85             int readyOps = key.readyOps();
       
    86             assertTrue((readyOps & interestOps) != 0);
       
    87             assertTrue((readyOps & notifiedOps.get()) == 0);
       
    88             notifiedOps.set(notifiedOps.get() | readyOps);
       
    89         }, 1000);
       
    90         assertTrue((n == 1) ^ (expectedOps == 0));
       
    91         assertTrue(notifiedOps.get() == expectedOps);
       
    92 
       
    93         // selectNow(Consumer)
       
    94         notifiedOps.set(0);
       
    95         n = sel.selectNow(k -> {
       
    96             assertTrue(Thread.currentThread() == callerThread);
       
    97             assertTrue(k == key);
       
    98             int readyOps = key.readyOps();
       
    99             assertTrue((readyOps & interestOps) != 0);
       
   100             assertTrue((readyOps & notifiedOps.get()) == 0);
       
   101             notifiedOps.set(notifiedOps.get() | readyOps);
       
   102         });
       
   103         assertTrue((n == 1) ^ (expectedOps == 0));
       
   104         assertTrue(notifiedOps.get() == expectedOps);
       
   105     }
       
   106 
       
   107     /**
       
   108      * Test that an action is performed when a channel is ready for reading.
       
   109      */
       
   110     public void testReadable() throws Exception {
       
   111         Pipe p = Pipe.open();
       
   112         try (Selector sel = Selector.open()) {
       
   113             Pipe.SinkChannel sink = p.sink();
       
   114             Pipe.SourceChannel source = p.source();
       
   115             source.configureBlocking(false);
       
   116             SelectionKey key = source.register(sel, SelectionKey.OP_READ);
       
   117 
       
   118             // write to sink to ensure source is readable
       
   119             scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS);
       
   120 
       
   121             // test that action is invoked
       
   122             testActionInvoked(key, SelectionKey.OP_READ);
       
   123         } finally {
       
   124             closePipe(p);
       
   125         }
       
   126     }
       
   127 
       
   128     /**
       
   129      * Test that an action is performed when a channel is ready for writing.
       
   130      */
       
   131     public void testWritable() throws Exception {
       
   132         Pipe p = Pipe.open();
       
   133         try (Selector sel = Selector.open()) {
       
   134             Pipe.SourceChannel source = p.source();
       
   135             Pipe.SinkChannel sink = p.sink();
       
   136             sink.configureBlocking(false);
       
   137             SelectionKey key = sink.register(sel, SelectionKey.OP_WRITE);
       
   138 
       
   139             // test that action is invoked
       
   140             testActionInvoked(key, SelectionKey.OP_WRITE);
       
   141         } finally {
       
   142             closePipe(p);
       
   143         }
       
   144     }
       
   145 
       
   146     /**
       
   147      * Test that an action is performed when a channel is ready for both
       
   148      * reading and writing.
       
   149      */
       
   150     public void testReadableAndWriteable() throws Exception {
       
   151         ServerSocketChannel ssc = null;
       
   152         SocketChannel sc = null;
       
   153         SocketChannel peer = null;
       
   154         try (Selector sel = Selector.open()) {
       
   155             ssc = ServerSocketChannel.open().bind(new InetSocketAddress(0));
       
   156             sc = SocketChannel.open(ssc.getLocalAddress());
       
   157             sc.configureBlocking(false);
       
   158             SelectionKey key = sc.register(sel, (SelectionKey.OP_READ |
       
   159                                                  SelectionKey.OP_WRITE));
       
   160 
       
   161             // accept connection and write data so the source is readable
       
   162             peer = ssc.accept();
       
   163             peer.write(messageBuffer());
       
   164 
       
   165             // test that action is invoked
       
   166             testActionInvoked(key, (SelectionKey.OP_READ | SelectionKey.OP_WRITE));
       
   167         } finally {
       
   168             if (ssc != null) ssc.close();
       
   169             if (sc != null) sc.close();
       
   170             if (peer != null) peer.close();
       
   171         }
       
   172     }
       
   173 
       
   174     /**
       
   175      * Test that the action is called for two selected channels
       
   176      */
       
   177     public void testTwoChannels() throws Exception {
       
   178         Pipe p = Pipe.open();
       
   179         try (Selector sel = Selector.open()) {
       
   180             Pipe.SourceChannel source = p.source();
       
   181             Pipe.SinkChannel sink = p.sink();
       
   182             source.configureBlocking(false);
       
   183             sink.configureBlocking(false);
       
   184             SelectionKey key1 = source.register(sel, SelectionKey.OP_READ);
       
   185             SelectionKey key2 = sink.register(sel, SelectionKey.OP_WRITE);
       
   186 
       
   187             // write to sink to ensure that the source is readable
       
   188             sink.write(messageBuffer());
       
   189 
       
   190             var counter = new AtomicInteger();
       
   191 
       
   192             // select(Consumer)
       
   193             counter.set(0);
       
   194             int n = sel.select(k -> {
       
   195                 counter.incrementAndGet();
       
   196                 if (k == key1) {
       
   197                     assertTrue(k.isReadable());
       
   198                 } else if (k == key2) {
       
   199                     assertTrue(k.isWritable());
       
   200                 } else {
       
   201                     assertTrue(false);
       
   202                 }
       
   203             });
       
   204             assertTrue(n == 2);
       
   205             assertTrue(counter.get() == 2);
       
   206 
       
   207             // select(Consumer, timeout)
       
   208             counter.set(0);
       
   209             n = sel.select(k -> {
       
   210                 counter.incrementAndGet();
       
   211                 if (k == key1) {
       
   212                     assertTrue(k.isReadable());
       
   213                 } else if (k == key2) {
       
   214                     assertTrue(k.isWritable());
       
   215                 } else {
       
   216                     assertTrue(false);
       
   217                 }
       
   218             }, 1000);
       
   219             assertTrue(n == 2);
       
   220             assertTrue(counter.get() == 2);
       
   221 
       
   222             // selectNow(Consumer)
       
   223             counter.set(0);
       
   224             n = sel.selectNow(k -> {
       
   225                 counter.incrementAndGet();
       
   226                 if (k == key1) {
       
   227                     assertTrue(k.isReadable());
       
   228                 } else if (k == key2) {
       
   229                     assertTrue(k.isWritable());
       
   230                 } else {
       
   231                     assertTrue(false);
       
   232                 }
       
   233             });
       
   234             assertTrue(n == 2);
       
   235             assertTrue(counter.get() == 2);
       
   236         } finally {
       
   237             closePipe(p);
       
   238         }
       
   239     }
       
   240 
       
   241     /**
       
   242      * Test calling select twice, the action should be invoked each time
       
   243      */
       
   244     public void testRepeatedSelect1() throws Exception {
       
   245         Pipe p = Pipe.open();
       
   246         try (Selector sel = Selector.open()) {
       
   247             Pipe.SourceChannel source = p.source();
       
   248             Pipe.SinkChannel sink = p.sink();
       
   249             source.configureBlocking(false);
       
   250             SelectionKey key = source.register(sel, SelectionKey.OP_READ);
       
   251 
       
   252             // write to sink to ensure that the source is readable
       
   253             sink.write(messageBuffer());
       
   254 
       
   255             // test that action is invoked
       
   256             testActionInvoked(key, SelectionKey.OP_READ);
       
   257             testActionInvoked(key, SelectionKey.OP_READ);
       
   258 
       
   259         } finally {
       
   260             closePipe(p);
       
   261         }
       
   262     }
       
   263 
       
   264     /**
       
   265      * Test calling select twice. An I/O operation is performed after the
       
   266      * first select so the channel will not be selected by the second select.
       
   267      */
       
   268     public void testRepeatedSelect2() throws Exception {
       
   269         Pipe p = Pipe.open();
       
   270         try (Selector sel = Selector.open()) {
       
   271             Pipe.SourceChannel source = p.source();
       
   272             Pipe.SinkChannel sink = p.sink();
       
   273             source.configureBlocking(false);
       
   274             SelectionKey key = source.register(sel, SelectionKey.OP_READ);
       
   275 
       
   276             // write to sink to ensure that the source is readable
       
   277             sink.write(messageBuffer());
       
   278 
       
   279             // test that action is invoked
       
   280             testActionInvoked(key, SelectionKey.OP_READ);
       
   281 
       
   282             // read all bytes
       
   283             int n;
       
   284             ByteBuffer bb = ByteBuffer.allocate(100);
       
   285             do {
       
   286                 n = source.read(bb);
       
   287                 bb.clear();
       
   288             } while (n > 0);
       
   289 
       
   290             // test that action is not invoked
       
   291             testActionInvoked(key, 0);
       
   292         } finally {
       
   293             closePipe(p);
       
   294         }
       
   295     }
       
   296 
       
   297     /**
       
   298      * Test timeout
       
   299      */
       
   300     public void testTimeout() throws Exception {
       
   301         Pipe p = Pipe.open();
       
   302         try (Selector sel = Selector.open()) {
       
   303             Pipe.SourceChannel source = p.source();
       
   304             Pipe.SinkChannel sink = p.sink();
       
   305             source.configureBlocking(false);
       
   306             source.register(sel, SelectionKey.OP_READ);
       
   307             long start = System.currentTimeMillis();
       
   308             int n = sel.select(k -> assertTrue(false), 1000L);
       
   309             long duration = System.currentTimeMillis() - start;
       
   310             assertTrue(n == 0);
       
   311             assertTrue(duration > 500, "select took " + duration + " ms");
       
   312         } finally {
       
   313             closePipe(p);
       
   314         }
       
   315     }
       
   316 
       
   317     /**
       
   318      * Test wakeup prior to select
       
   319      */
       
   320     public void testWakeupBeforeSelect() throws Exception {
       
   321         // select(Consumer)
       
   322         try (Selector sel = Selector.open()) {
       
   323             sel.wakeup();
       
   324             int n = sel.select(k -> assertTrue(false));
       
   325             assertTrue(n == 0);
       
   326         }
       
   327 
       
   328         // select(Consumer, timeout)
       
   329         try (Selector sel = Selector.open()) {
       
   330             sel.wakeup();
       
   331             long start = System.currentTimeMillis();
       
   332             int n = sel.select(k -> assertTrue(false), 60*1000);
       
   333             long duration = System.currentTimeMillis() - start;
       
   334             assertTrue(n == 0);
       
   335             assertTrue(duration < 5000, "select took " + duration + " ms");
       
   336         }
       
   337     }
       
   338 
       
   339     /**
       
   340      * Test wakeup during select
       
   341      */
       
   342     public void testWakeupDuringSelect() throws Exception {
       
   343         // select(Consumer)
       
   344         try (Selector sel = Selector.open()) {
       
   345             scheduleWakeup(sel, 1, SECONDS);
       
   346             int n = sel.select(k -> assertTrue(false));
       
   347             assertTrue(n == 0);
       
   348         }
       
   349 
       
   350         // select(Consumer, timeout)
       
   351         try (Selector sel = Selector.open()) {
       
   352             scheduleWakeup(sel, 1, SECONDS);
       
   353             long start = System.currentTimeMillis();
       
   354             int n = sel.select(k -> assertTrue(false), 60*1000);
       
   355             long duration = System.currentTimeMillis() - start;
       
   356             assertTrue(n == 0);
       
   357             assertTrue(duration > 500 && duration < 10*1000,
       
   358                     "select took " + duration + " ms");
       
   359         }
       
   360     }
       
   361 
       
   362     /**
       
   363      * Test invoking select with interrupt status set
       
   364      */
       
   365     public void testInterruptBeforeSelect() throws Exception {
       
   366         // select(Consumer)
       
   367         try (Selector sel = Selector.open()) {
       
   368             Thread.currentThread().interrupt();
       
   369             int n = sel.select(k -> assertTrue(false));
       
   370             assertTrue(n == 0);
       
   371             assertTrue(Thread.currentThread().isInterrupted());
       
   372             assertTrue(sel.isOpen());
       
   373         } finally {
       
   374             Thread.currentThread().interrupted();  // clear interrupt status
       
   375         }
       
   376 
       
   377         // select(Consumer, timeout)
       
   378         try (Selector sel = Selector.open()) {
       
   379             Thread.currentThread().interrupt();
       
   380             long start = System.currentTimeMillis();
       
   381             int n = sel.select(k -> assertTrue(false), 60*1000);
       
   382             long duration = System.currentTimeMillis() - start;
       
   383             assertTrue(n == 0);
       
   384             assertTrue(duration < 5000, "select took " + duration + " ms");
       
   385             assertTrue(Thread.currentThread().isInterrupted());
       
   386             assertTrue(sel.isOpen());
       
   387         } finally {
       
   388             Thread.currentThread().interrupted();  // clear interrupt status
       
   389         }
       
   390     }
       
   391 
       
   392     /**
       
   393      * Test interrupt thread during select
       
   394      */
       
   395     public void testInterruptDuringSelect() throws Exception {
       
   396         // select(Consumer)
       
   397         try (Selector sel = Selector.open()) {
       
   398             scheduleInterrupt(Thread.currentThread(), 1, SECONDS);
       
   399             int n = sel.select(k -> assertTrue(false));
       
   400             assertTrue(n == 0);
       
   401             assertTrue(Thread.currentThread().isInterrupted());
       
   402             assertTrue(sel.isOpen());
       
   403         } finally {
       
   404             Thread.currentThread().interrupted();  // clear interrupt status
       
   405         }
       
   406 
       
   407         // select(Consumer, timeout)
       
   408         try (Selector sel = Selector.open()) {
       
   409             scheduleInterrupt(Thread.currentThread(), 1, SECONDS);
       
   410             long start = System.currentTimeMillis();
       
   411             int n = sel.select(k -> assertTrue(false), 60*1000);
       
   412             long duration = System.currentTimeMillis() - start;
       
   413             assertTrue(n == 0);
       
   414             assertTrue(duration > 500 && duration < 5000,
       
   415                     "select took " + duration + " ms");
       
   416             assertTrue(Thread.currentThread().isInterrupted());
       
   417             assertTrue(sel.isOpen());
       
   418         } finally {
       
   419             Thread.currentThread().interrupted();  // clear interrupt status
       
   420         }
       
   421     }
       
   422 
       
   423     /**
       
   424      * Test invoking select on a closed selector
       
   425      */
       
   426     @Test(expectedExceptions = ClosedSelectorException.class)
       
   427     public void testClosedSelector1() throws Exception {
       
   428         Selector sel = Selector.open();
       
   429         sel.close();
       
   430         sel.select(k -> assertTrue(false));
       
   431     }
       
   432     @Test(expectedExceptions = ClosedSelectorException.class)
       
   433     public void testClosedSelector2() throws Exception {
       
   434         Selector sel = Selector.open();
       
   435         sel.close();
       
   436         sel.select(k -> assertTrue(false), 1000);
       
   437     }
       
   438     @Test(expectedExceptions = ClosedSelectorException.class)
       
   439     public void testClosedSelector3() throws Exception {
       
   440         Selector sel = Selector.open();
       
   441         sel.close();
       
   442         sel.selectNow(k -> assertTrue(false));
       
   443     }
       
   444 
       
   445     /**
       
   446      * Test closing selector while in a selection operation
       
   447      */
       
   448     public void testCloseDuringSelect() throws Exception {
       
   449         // select(Consumer)
       
   450         try (Selector sel = Selector.open()) {
       
   451             scheduleClose(sel, 3, SECONDS);
       
   452             int n = sel.select(k -> assertTrue(false));
       
   453             assertTrue(n == 0);
       
   454             assertFalse(sel.isOpen());
       
   455         }
       
   456 
       
   457         // select(Consumer, timeout)
       
   458         try (Selector sel = Selector.open()) {
       
   459             scheduleClose(sel, 3, SECONDS);
       
   460             long start = System.currentTimeMillis();
       
   461             int n = sel.select(k -> assertTrue(false), 60*1000);
       
   462             long duration = System.currentTimeMillis() - start;
       
   463             assertTrue(n == 0);
       
   464             assertTrue(duration > 2000 && duration < 10*1000,
       
   465                     "select took " + duration + " ms");
       
   466             assertFalse(sel.isOpen());
       
   467         }
       
   468     }
       
   469 
       
   470     /**
       
   471      * Test action closing selector
       
   472      */
       
   473     @Test(expectedExceptions = ClosedSelectorException.class)
       
   474     public void testActionClosingSelector() throws Exception {
       
   475         Pipe p = Pipe.open();
       
   476         try (Selector sel = Selector.open()) {
       
   477             Pipe.SourceChannel source = p.source();
       
   478             Pipe.SinkChannel sink = p.sink();
       
   479             source.configureBlocking(false);
       
   480             SelectionKey key = source.register(sel, SelectionKey.OP_READ);
       
   481 
       
   482             // write to sink to ensure that the source is readable
       
   483             sink.write(messageBuffer());
       
   484 
       
   485             // should relay ClosedSelectorException
       
   486             sel.select(k -> {
       
   487                 assertTrue(k == key);
       
   488                 try {
       
   489                     sel.close();
       
   490                 } catch (IOException ioe) { }
       
   491             });
       
   492         } finally {
       
   493             closePipe(p);
       
   494         }
       
   495     }
       
   496 
       
   497     /**
       
   498      * Test that the action is invoked while synchronized on the selector and
       
   499      * its selected-key set.
       
   500      */
       
   501     public void testLocks() throws Exception {
       
   502         Pipe p = Pipe.open();
       
   503         try (Selector sel = Selector.open()) {
       
   504             Pipe.SourceChannel source = p.source();
       
   505             Pipe.SinkChannel sink = p.sink();
       
   506             source.configureBlocking(false);
       
   507             SelectionKey key = source.register(sel, SelectionKey.OP_READ);
       
   508 
       
   509             // write to sink to ensure that the source is readable
       
   510             sink.write(messageBuffer());
       
   511 
       
   512             // select(Consumer)
       
   513             sel.select(k -> {
       
   514                 assertTrue(k == key);
       
   515                 assertTrue(Thread.holdsLock(sel));
       
   516                 assertFalse(Thread.holdsLock(sel.keys()));
       
   517                 assertTrue(Thread.holdsLock(sel.selectedKeys()));
       
   518             });
       
   519 
       
   520             // select(Consumer, timeout)
       
   521             sel.select(k -> {
       
   522                 assertTrue(k == key);
       
   523                 assertTrue(Thread.holdsLock(sel));
       
   524                 assertFalse(Thread.holdsLock(sel.keys()));
       
   525                 assertTrue(Thread.holdsLock(sel.selectedKeys()));
       
   526             }, 1000L);
       
   527 
       
   528             // selectNow(Consumer)
       
   529             sel.selectNow(k -> {
       
   530                 assertTrue(k == key);
       
   531                 assertTrue(Thread.holdsLock(sel));
       
   532                 assertFalse(Thread.holdsLock(sel.keys()));
       
   533                 assertTrue(Thread.holdsLock(sel.selectedKeys()));
       
   534             });
       
   535         } finally {
       
   536             closePipe(p);
       
   537         }
       
   538     }
       
   539 
       
   540     /**
       
   541      * Test that selection operations remove cancelled keys from the selector's
       
   542      * key and selected-key sets.
       
   543      */
       
   544     public void testCancel() throws Exception {
       
   545         Pipe p = Pipe.open();
       
   546         try (Selector sel = Selector.open()) {
       
   547             Pipe.SinkChannel sink = p.sink();
       
   548             Pipe.SourceChannel source = p.source();
       
   549 
       
   550             // write to sink to ensure that the source is readable
       
   551             sink.write(messageBuffer());
       
   552 
       
   553             sink.configureBlocking(false);
       
   554             source.configureBlocking(false);
       
   555             SelectionKey key1 = sink.register(sel, SelectionKey.OP_WRITE);
       
   556             SelectionKey key2 = source.register(sel, SelectionKey.OP_READ);
       
   557 
       
   558             sel.selectNow();
       
   559             assertTrue(sel.keys().contains(key1));
       
   560             assertTrue(sel.keys().contains(key2));
       
   561             assertTrue(sel.selectedKeys().contains(key1));
       
   562             assertTrue(sel.selectedKeys().contains(key2));
       
   563 
       
   564             // cancel key1
       
   565             key1.cancel();
       
   566             int n = sel.selectNow(k -> assertTrue(k == key2));
       
   567             assertTrue(n == 1);
       
   568             assertFalse(sel.keys().contains(key1));
       
   569             assertTrue(sel.keys().contains(key2));
       
   570             assertFalse(sel.selectedKeys().contains(key1));
       
   571             assertTrue(sel.selectedKeys().contains(key2));
       
   572 
       
   573             // cancel key2
       
   574             key2.cancel();
       
   575             n = sel.selectNow(k -> assertTrue(false));
       
   576             assertTrue(n == 0);
       
   577             assertFalse(sel.keys().contains(key1));
       
   578             assertFalse(sel.keys().contains(key2));
       
   579             assertFalse(sel.selectedKeys().contains(key1));
       
   580             assertFalse(sel.selectedKeys().contains(key2));
       
   581         } finally {
       
   582             closePipe(p);
       
   583         }
       
   584     }
       
   585 
       
   586     /**
       
   587      * Test an action invoking select()
       
   588      */
       
   589     public void testReentrantSelect1() throws Exception {
       
   590         Pipe p = Pipe.open();
       
   591         try (Selector sel = Selector.open()) {
       
   592             Pipe.SinkChannel sink = p.sink();
       
   593             Pipe.SourceChannel source = p.source();
       
   594             source.configureBlocking(false);
       
   595             source.register(sel, SelectionKey.OP_READ);
       
   596 
       
   597             // write to sink to ensure that the source is readable
       
   598             scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS);
       
   599 
       
   600             int n = sel.select(k -> {
       
   601                 try {
       
   602                     sel.select();
       
   603                     assertTrue(false);
       
   604                 } catch (IOException ioe) {
       
   605                     throw new RuntimeException(ioe);
       
   606                 } catch (IllegalStateException expected) {
       
   607                 }
       
   608             });
       
   609             assertTrue(n == 1);
       
   610         } finally {
       
   611             closePipe(p);
       
   612         }
       
   613     }
       
   614 
       
   615     /**
       
   616      * Test an action invoking selectNow()
       
   617      */
       
   618     public void testReentrantSelect2() throws Exception {
       
   619         Pipe p = Pipe.open();
       
   620         try (Selector sel = Selector.open()) {
       
   621             Pipe.SinkChannel sink = p.sink();
       
   622             Pipe.SourceChannel source = p.source();
       
   623 
       
   624             // write to sink to ensure that the source is readable
       
   625             scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS);
       
   626 
       
   627             source.configureBlocking(false);
       
   628             source.register(sel, SelectionKey.OP_READ);
       
   629             int n = sel.select(k -> {
       
   630                 try {
       
   631                     sel.selectNow();
       
   632                     assertTrue(false);
       
   633                 } catch (IOException ioe) {
       
   634                     throw new RuntimeException(ioe);
       
   635                 } catch (IllegalStateException expected) {
       
   636                 }
       
   637             });
       
   638             assertTrue(n == 1);
       
   639         } finally {
       
   640             closePipe(p);
       
   641         }
       
   642     }
       
   643 
       
   644     /**
       
   645      * Test an action invoking select(Consumer)
       
   646      */
       
   647     public void testReentrantSelect3() throws Exception {
       
   648         Pipe p = Pipe.open();
       
   649         try (Selector sel = Selector.open()) {
       
   650             Pipe.SinkChannel sink = p.sink();
       
   651             Pipe.SourceChannel source = p.source();
       
   652 
       
   653             // write to sink to ensure that the source is readable
       
   654             scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS);
       
   655 
       
   656             source.configureBlocking(false);
       
   657             source.register(sel, SelectionKey.OP_READ);
       
   658             int n = sel.select(k -> {
       
   659                 try {
       
   660                     sel.select(x -> assertTrue(false));
       
   661                     assertTrue(false);
       
   662                 } catch (IOException ioe) {
       
   663                     throw new RuntimeException(ioe);
       
   664                 } catch (IllegalStateException expected) {
       
   665                 }
       
   666             });
       
   667             assertTrue(n == 1);
       
   668         } finally {
       
   669             closePipe(p);
       
   670         }
       
   671     }
       
   672 
       
   673     /**
       
   674      * Negative timeout
       
   675      */
       
   676     @Test(expectedExceptions = IllegalArgumentException.class)
       
   677     public void testNegativeTimeout() throws Exception {
       
   678         try (Selector sel = Selector.open()) {
       
   679             sel.select(k -> { }, -1L);
       
   680         }
       
   681     }
       
   682 
       
   683     /**
       
   684      * Null action
       
   685      */
       
   686     @Test(expectedExceptions = NullPointerException.class)
       
   687     public void testNull1() throws Exception {
       
   688         try (Selector sel = Selector.open()) {
       
   689             sel.select(null);
       
   690         }
       
   691     }
       
   692     @Test(expectedExceptions = NullPointerException.class)
       
   693     public void testNull2() throws Exception {
       
   694         try (Selector sel = Selector.open()) {
       
   695             sel.select(null, 1000);
       
   696         }
       
   697     }
       
   698     @Test(expectedExceptions = NullPointerException.class)
       
   699     public void testNull3() throws Exception {
       
   700         try (Selector sel = Selector.open()) {
       
   701             sel.selectNow(null);
       
   702         }
       
   703     }
       
   704 
       
   705 
       
   706     // -- support methods ---
       
   707 
       
   708     private final ScheduledExecutorService POOL = Executors.newScheduledThreadPool(1);
       
   709 
       
   710     @AfterTest
       
   711     void shutdownThreadPool() {
       
   712         POOL.shutdown();
       
   713     }
       
   714 
       
   715     void scheduleWakeup(Selector sel, long delay, TimeUnit unit) {
       
   716         POOL.schedule(() -> sel.wakeup(), delay, unit);
       
   717     }
       
   718 
       
   719     void scheduleInterrupt(Thread t, long delay, TimeUnit unit) {
       
   720         POOL.schedule(() -> t.interrupt(), delay, unit);
       
   721     }
       
   722 
       
   723     void scheduleClose(Closeable c, long delay, TimeUnit unit) {
       
   724         POOL.schedule(() -> {
       
   725             try {
       
   726                 c.close();
       
   727             } catch (IOException ioe) {
       
   728                 ioe.printStackTrace();
       
   729             }
       
   730         }, delay, unit);
       
   731     }
       
   732 
       
   733     void scheduleWrite(WritableByteChannel sink, ByteBuffer buf, long delay, TimeUnit unit) {
       
   734         POOL.schedule(() -> {
       
   735             try {
       
   736                 sink.write(buf);
       
   737             } catch (IOException ioe) {
       
   738                 ioe.printStackTrace();
       
   739             }
       
   740         }, delay, unit);
       
   741     }
       
   742 
       
   743     static void closePipe(Pipe p) {
       
   744         try { p.sink().close(); } catch (IOException ignore) { }
       
   745         try { p.source().close(); } catch (IOException ignore) { }
       
   746     }
       
   747 
       
   748     static ByteBuffer messageBuffer() {
       
   749         try {
       
   750             return ByteBuffer.wrap("message".getBytes("UTF-8"));
       
   751         } catch (Exception e) {
       
   752             throw new RuntimeException(e);
       
   753         }
       
   754     }
       
   755 }