--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/nio/channels/Selector/SelectorTest.java Tue Sep 12 19:03:39 2017 +0200
@@ -0,0 +1,413 @@
+/*
+ * Copyright (c) 2000, 2010, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/* @test
+ * @summary Test selectors and socketchannels
+ * @library ..
+ * @key randomness
+ */
+
+import java.io.*;
+import java.net.*;
+import java.nio.*;
+import java.nio.channels.*;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.*;
+
+
+public class SelectorTest {
+ private static List clientList = new LinkedList();
+ private static Random rnd = new Random();
+ public static int NUM_CLIENTS = 30;
+ public static int TEST_PORT = 31452;
+ static PrintStream log = System.err;
+ private static int FINISH_TIME = 30000;
+
+ /*
+ * Usage note
+ *
+ * java SelectorTest [server] [client <host>] [<port>]
+ *
+ * No arguments runs both client and server in separate threads
+ * using the default port of 31452.
+ *
+ * client runs the client on this machine and connects to server
+ * at the given IP address.
+ *
+ * server runs the server on localhost.
+ */
+ public static void main(String[] args) throws Exception {
+ if (args.length == 0) {
+ Server server = new Server(0);
+ server.start();
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) { }
+ InetSocketAddress isa
+ = new InetSocketAddress(InetAddress.getLocalHost(), server.port());
+ Client client = new Client(isa);
+ client.start();
+ if ((server.finish(FINISH_TIME) & client.finish(FINISH_TIME)) == 0)
+ throw new Exception("Failure");
+ log.println();
+
+ } else if (args[0].equals("server")) {
+
+ if (args.length > 1)
+ TEST_PORT = Integer.parseInt(args[1]);
+ Server server = new Server(TEST_PORT);
+ server.start();
+ if (server.finish(FINISH_TIME) == 0)
+ throw new Exception("Failure");
+ log.println();
+
+ } else if (args[0].equals("client")) {
+
+ if (args.length < 2) {
+ log.println("No host specified: terminating.");
+ return;
+ }
+ String ip = args[1];
+ if (args.length > 2)
+ TEST_PORT = Integer.parseInt(args[2]);
+ InetAddress ia = InetAddress.getByName(ip);
+ InetSocketAddress isa = new InetSocketAddress(ia, TEST_PORT);
+ Client client = new Client(isa);
+ client.start();
+ if (client.finish(FINISH_TIME) == 0)
+ throw new Exception("Failure");
+ log.println();
+
+ } else {
+ System.out.println("Usage note:");
+ System.out.println("java SelectorTest [server] [client <host>] [<port>]");
+ System.out.println("No arguments runs both client and server in separate threads using the default port of 31452.");
+ System.out.println("client runs the client on this machine and connects to the server specified.");
+ System.out.println("server runs the server on localhost.");
+ }
+ }
+
+ static class Client extends TestThread {
+ InetSocketAddress isa;
+ Client(InetSocketAddress isa) {
+ super("Client", SelectorTest.log);
+ this.isa = isa;
+ }
+
+ public void go() throws Exception {
+ log.println("starting client...");
+ for (int i=0; i<NUM_CLIENTS; i++)
+ clientList.add(new RemoteEntity(i, isa, log));
+
+ Collections.shuffle(clientList);
+
+ log.println("created "+NUM_CLIENTS+" clients");
+ do {
+ for (Iterator i = clientList.iterator(); i.hasNext(); ) {
+ RemoteEntity re = (RemoteEntity) i.next();
+ if (re.cycle()) {
+ i.remove();
+ }
+ }
+ Collections.shuffle(clientList);
+ } while (clientList.size() > 0);
+ }
+ }
+
+ static class Server extends TestThread {
+ private final ServerSocketChannel ssc;
+ private List socketList = new ArrayList();
+ private ServerSocket ss;
+ private int connectionsAccepted = 0;
+ private Selector pollSelector;
+ private Selector acceptSelector;
+ private Set pkeys;
+ private Set pskeys;
+
+ Server(int port) throws IOException {
+ super("Server", SelectorTest.log);
+ this.ssc = ServerSocketChannel.open().bind(new InetSocketAddress(port));
+ }
+
+ int port() {
+ return ssc.socket().getLocalPort();
+ }
+
+ public void go() throws Exception {
+ log.println("starting server...");
+ acceptSelector = SelectorProvider.provider().openSelector();
+ pollSelector = SelectorProvider.provider().openSelector();
+ pkeys = pollSelector.keys();
+ pskeys = pollSelector.selectedKeys();
+ Set readyKeys = acceptSelector.selectedKeys();
+ RequestHandler rh = new RequestHandler(pollSelector, log);
+ Thread requestThread = new Thread(rh);
+
+ requestThread.start();
+
+ ssc.configureBlocking(false);
+ SelectionKey acceptKey = ssc.register(acceptSelector,
+ SelectionKey.OP_ACCEPT);
+ while(connectionsAccepted < SelectorTest.NUM_CLIENTS) {
+ int keysAdded = acceptSelector.select(100);
+ if (keysAdded > 0) {
+ Iterator i = readyKeys.iterator();
+ while(i.hasNext()) {
+ SelectionKey sk = (SelectionKey)i.next();
+ i.remove();
+ ServerSocketChannel nextReady =
+ (ServerSocketChannel)sk.channel();
+ SocketChannel sc = nextReady.accept();
+ connectionsAccepted++;
+ if (sc != null) {
+ sc.configureBlocking(false);
+ synchronized (pkeys) {
+ sc.register(pollSelector, SelectionKey.OP_READ);
+ }
+ } else {
+ throw new RuntimeException(
+ "Socket does not support Channels");
+ }
+ }
+ }
+ }
+ acceptKey.cancel();
+ requestThread.join();
+ acceptSelector.close();
+ pollSelector.close();
+ }
+ }
+}
+
+class RemoteEntity {
+ private static Random rnd = new Random();
+ int id;
+ ByteBuffer data;
+ int dataWrittenIndex;
+ int totalDataLength;
+ boolean initiated = false;
+ boolean connected = false;
+ boolean written = false;
+ boolean acked = false;
+ boolean closed = false;
+ private SocketChannel sc;
+ ByteBuffer ackBuffer;
+ PrintStream log;
+ InetSocketAddress server;
+
+ RemoteEntity(int id, InetSocketAddress server, PrintStream log)
+ throws Exception
+ {
+ int connectFailures = 0;
+ this.id = id;
+ this.log = log;
+ this.server = server;
+
+ sc = SocketChannel.open();
+ sc.configureBlocking(false);
+
+ // Prepare the data buffer to write out from this entity
+ // Let's use both slow and fast buffers
+ if (rnd.nextBoolean())
+ data = ByteBuffer.allocateDirect(100);
+ else
+ data = ByteBuffer.allocate(100);
+ String number = Integer.toString(id);
+ if (number.length() == 1)
+ number = "0"+number;
+ String source = "Testing from " + number;
+ data.put(source.getBytes("8859_1"));
+ data.flip();
+ totalDataLength = source.length();
+
+ // Allocate an ack buffer
+ ackBuffer = ByteBuffer.allocateDirect(10);
+ }
+
+ private void reset() throws Exception {
+ sc.close();
+ sc = SocketChannel.open();
+ sc.configureBlocking(false);
+ }
+
+ private void connect() throws Exception {
+ try {
+ connected = sc.connect(server);
+ initiated = true;
+ } catch (ConnectException e) {
+ initiated = false;
+ reset();
+ }
+ }
+
+ private void finishConnect() throws Exception {
+ try {
+ connected = sc.finishConnect();
+ } catch (IOException e) {
+ initiated = false;
+ reset();
+ }
+ }
+
+ int id() {
+ return id;
+ }
+
+ boolean cycle() throws Exception {
+ if (!initiated)
+ connect();
+ else if (!connected)
+ finishConnect();
+ else if (!written)
+ writeCycle();
+ else if (!acked)
+ ackCycle();
+ else if (!closed)
+ close();
+ return closed;
+ }
+
+ private void ackCycle() throws Exception {
+ //log.println("acking from "+id);
+ int bytesRead = sc.read(ackBuffer);
+ if (bytesRead > 0) {
+ acked = true;
+ }
+ }
+
+ private void close() throws Exception {
+ sc.close();
+ closed = true;
+ }
+
+ private void writeCycle() throws Exception {
+ log.println("writing from "+id);
+ int numBytesToWrite = rnd.nextInt(10)+1;
+ int newWriteTarget = dataWrittenIndex + numBytesToWrite;
+ if (newWriteTarget > totalDataLength)
+ newWriteTarget = totalDataLength;
+ data.limit(newWriteTarget);
+ int bytesWritten = sc.write(data);
+ if (bytesWritten > 0)
+ dataWrittenIndex += bytesWritten;
+ if (dataWrittenIndex == totalDataLength) {
+ written = true;
+ sc.socket().shutdownOutput();
+ }
+ }
+
+}
+
+
+class RequestHandler implements Runnable {
+ private static Random rnd = new Random();
+ private Selector selector;
+ private int connectionsHandled = 0;
+ private HashMap dataBin = new HashMap();
+ PrintStream log;
+
+ public RequestHandler(Selector selector, PrintStream log) {
+ this.selector = selector;
+ this.log = log;
+ }
+
+ public void run() {
+ log.println("starting request handler...");
+ int connectionsAccepted = 0;
+
+ Set nKeys = selector.keys();
+ Set readyKeys = selector.selectedKeys();
+
+ try {
+ while(connectionsHandled < SelectorTest.NUM_CLIENTS) {
+ int numKeys = selector.select(100);
+
+ // Process channels with data
+ synchronized (nKeys) {
+ if (readyKeys.size() > 0) {
+ Iterator i = readyKeys.iterator();
+ while(i.hasNext()) {
+ SelectionKey sk = (SelectionKey)i.next();
+ i.remove();
+ SocketChannel sc = (SocketChannel)sk.channel();
+ if (sc.isOpen())
+ read(sk, sc);
+ }
+ }
+ }
+
+ // Give other threads a chance to run
+ if (numKeys == 0) {
+ try {
+ Thread.sleep(1);
+ } catch (Exception x) {}
+ }
+ }
+ } catch (Exception e) {
+ log.println("Unexpected error 1: "+e);
+ e.printStackTrace();
+ }
+ }
+
+ private void read(SelectionKey sk, SocketChannel sc) throws Exception {
+ ByteBuffer bin = (ByteBuffer)dataBin.get(sc);
+ if (bin == null) {
+ if (rnd.nextBoolean())
+ bin = ByteBuffer.allocateDirect(100);
+ else
+ bin = ByteBuffer.allocate(100);
+ dataBin.put(sc, bin);
+ }
+
+ int bytesRead = 0;
+ do {
+ bytesRead = sc.read(bin);
+ } while(bytesRead > 0);
+
+ if (bytesRead == -1) {
+ sk.interestOps(0);
+ bin.flip();
+ int size = bin.limit();
+ byte[] data = new byte[size];
+ for(int j=0; j<size; j++)
+ data[j] = bin.get();
+ String message = new String(data, "8859_1");
+ connectionsHandled++;
+ acknowledge(sc);
+ log.println("Received >>>"+message + "<<<");
+ log.println("Handled: "+connectionsHandled);
+ }
+ }
+
+ private void acknowledge(SocketChannel sc) throws Exception {
+ ByteBuffer ackBuffer = ByteBuffer.allocateDirect(10);
+ String s = "ack";
+ ackBuffer.put(s.getBytes("8859_1"));
+ ackBuffer.flip();
+ int bytesWritten = 0;
+ while(bytesWritten == 0) {
+ bytesWritten += sc.write(ackBuffer);
+ }
+ sc.close();
+ }
+}