21 * questions. |
21 * questions. |
22 */ |
22 */ |
23 |
23 |
24 /* @test |
24 /* @test |
25 * @bug 8201315 |
25 * @bug 8201315 |
|
26 * @build SelectorUtils |
|
27 * @run main RegisterDuringSelect |
26 * @summary Test that channels can be registered, interest ops can changed, |
28 * @summary Test that channels can be registered, interest ops can changed, |
27 * and keys cancelled while a selection operation is in progress. |
29 * and keys cancelled while a selection operation is in progress. |
28 */ |
30 */ |
29 |
31 |
30 import java.io.IOException; |
32 import java.io.IOException; |
31 import java.nio.channels.ClosedSelectorException; |
|
32 import java.nio.channels.Pipe; |
33 import java.nio.channels.Pipe; |
33 import java.nio.channels.SelectionKey; |
34 import java.nio.channels.SelectionKey; |
34 import java.nio.channels.Selector; |
35 import java.nio.channels.Selector; |
35 import java.util.concurrent.Callable; |
|
36 import java.util.concurrent.ExecutorService; |
|
37 import java.util.concurrent.Executors; |
|
38 import java.util.concurrent.Future; |
|
39 import java.util.concurrent.Phaser; |
|
40 |
36 |
41 public class RegisterDuringSelect { |
37 public class RegisterDuringSelect { |
|
38 interface TestOperation { |
|
39 void accept(Thread t, Selector sel, Pipe.SourceChannel sc) throws Exception; |
|
40 } |
|
41 static class Test { |
|
42 final Selector sel; |
|
43 final Pipe p; |
|
44 final Pipe.SourceChannel sc; |
42 |
45 |
43 static Callable<Void> selectLoop(Selector sel, Phaser barrier) { |
46 Test() throws Exception { |
44 return new Callable<Void>() { |
47 sel = Selector.open(); |
45 @Override |
48 p = Pipe.open(); |
46 public Void call() throws IOException { |
49 sc = p.source(); |
47 for (;;) { |
50 sc.configureBlocking(false); |
|
51 } |
|
52 void test(TestOperation op) throws Exception { |
|
53 try { |
|
54 Thread t = new Thread(() -> { |
48 try { |
55 try { |
49 sel.select(); |
56 sel.select(); |
50 } catch (ClosedSelectorException ignore) { |
57 } catch (IOException ex) { |
51 return null; |
58 throw new RuntimeException(ex); |
52 } |
59 } |
53 if (sel.isOpen()) { |
60 }); |
54 barrier.arriveAndAwaitAdvance(); |
61 t.start(); |
55 System.out.println("selectLoop advanced ..."); |
62 op.accept(t, sel, sc); |
56 } else { |
63 } finally { |
57 // closed |
64 sel.close(); |
58 return null; |
65 p.source().close(); |
59 } |
66 p.sink().close(); |
60 } |
|
61 } |
67 } |
62 }; |
68 } |
63 } |
69 } |
64 /** |
70 /** |
65 * Invoke register, interestOps, and cancel concurrently with a thread |
71 * Invoke register, interestOps, and cancel concurrently with a thread |
66 * doing a selection operation |
72 * doing a selection operation |
67 */ |
73 */ |
68 public static void main(String args[]) throws Exception { |
74 public static void main(String args[]) throws Exception { |
69 Future<Void> result; |
75 new Test().test((t, sel, sc) -> { |
70 |
76 System.out.println("register ..."); |
71 ExecutorService pool = Executors.newFixedThreadPool(1); |
77 // spin until make sure select is invoked |
72 try (Selector sel = Selector.open()) { |
78 SelectorUtils.spinUntilLocked(t, sel); |
73 Phaser barrier = new Phaser(2); |
79 SelectionKey key = sc.register(sel, SelectionKey.OP_READ); |
74 |
|
75 // submit task to do the select loop |
|
76 result = pool.submit(selectLoop(sel, barrier)); |
|
77 |
|
78 Pipe p = Pipe.open(); |
|
79 try { |
80 try { |
80 Pipe.SourceChannel sc = p.source(); |
|
81 sc.configureBlocking(false); |
|
82 |
|
83 System.out.println("register ..."); |
|
84 SelectionKey key = sc.register(sel, SelectionKey.OP_READ); |
|
85 if (!sel.keys().contains(key)) |
81 if (!sel.keys().contains(key)) |
86 throw new RuntimeException("key not in key set"); |
82 throw new RuntimeException("key not in key set"); |
|
83 } finally { |
87 sel.wakeup(); |
84 sel.wakeup(); |
88 barrier.arriveAndAwaitAdvance(); |
85 t.join(); |
89 |
86 } |
90 System.out.println("interestOps ..."); |
87 }); |
91 key.interestOps(0); |
88 new Test().test((t, sel, sc) -> { |
|
89 System.out.println("interestOps ..."); |
|
90 SelectionKey key = sc.register(sel, SelectionKey.OP_READ); |
|
91 // spin until make sure select is invoked |
|
92 SelectorUtils.spinUntilLocked(t, sel); |
|
93 key.interestOps(0); |
|
94 try { |
|
95 if (key.interestOps() != 0) |
|
96 throw new RuntimeException("interested ops not cleared"); |
|
97 } finally { |
92 sel.wakeup(); |
98 sel.wakeup(); |
93 barrier.arriveAndAwaitAdvance(); |
99 t.join(); |
94 |
|
95 System.out.println("cancel ..."); |
|
96 key.cancel(); |
|
97 sel.wakeup(); |
|
98 barrier.arriveAndAwaitAdvance(); |
|
99 if (sel.keys().contains(key)) |
|
100 throw new RuntimeException("key not removed from key set"); |
|
101 |
|
102 } finally { |
|
103 p.source().close(); |
|
104 p.sink().close(); |
|
105 } |
100 } |
106 |
101 }); |
107 } finally { |
102 new Test().test((t, sel, sc) -> { |
108 pool.shutdown(); |
103 System.out.println("cancel ..."); |
109 } |
104 SelectionKey key = sc.register(sel, SelectionKey.OP_READ); |
110 |
105 // spin until make sure select is invoked |
111 // ensure selectLoop completes without exception |
106 SelectorUtils.spinUntilLocked(t, sel); |
112 result.get(); |
107 key.cancel(); |
113 |
108 sel.wakeup(); |
|
109 t.join(); |
|
110 if (sel.keys().contains(key)) |
|
111 throw new RuntimeException("key not removed from key set"); |
|
112 }); |
114 } |
113 } |
115 } |
114 } |
116 |
|