7026287: Asynchronous API sample
Summary: Implement a chat server using the new asynchronous networking API
Reviewed-by: hosterda, alanb
--- a/jdk/make/mksample/nio/Makefile Mon Apr 11 23:20:41 2011 -0700
+++ b/jdk/make/mksample/nio/Makefile Tue Apr 12 09:04:57 2011 +0200
@@ -1,5 +1,5 @@
#
-# Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
+# Copyright (c) 2004, 2011, Oracle and/or its affiliates. All rights reserved.
# DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
#
# This code is free software; you can redistribute it and/or modify it
@@ -31,7 +31,7 @@
PRODUCT = java
include $(BUILDDIR)/common/Defs.gmk
-SUBDIRS = file multicast server
+SUBDIRS = chatserver file multicast server
include $(BUILDDIR)/common/Subdirs.gmk
all build clean clobber::
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/make/mksample/nio/chatserver/Makefile Tue Apr 12 09:04:57 2011 +0200
@@ -0,0 +1,56 @@
+#
+# Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+# DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+#
+# This code is free software; you can redistribute it and/or modify it
+# under the terms of the GNU General Public License version 2 only, as
+# published by the Free Software Foundation. Oracle designates this
+# particular file as subject to the "Classpath" exception as provided
+# by Oracle in the LICENSE file that accompanied this code.
+#
+# This code is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+# version 2 for more details (a copy is included in the LICENSE file that
+# accompanied this code).
+#
+# You should have received a copy of the GNU General Public License version
+# 2 along with this work; if not, write to the Free Software Foundation,
+# Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+#
+# Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+# or visit www.oracle.com if you need additional information or have any
+# questions.
+#
+
+#
+# Makefile for the nio/chatserver sample code
+#
+
+BUILDDIR = ../../..
+
+PRODUCT = java
+
+include $(BUILDDIR)/common/Defs.gmk
+
+SAMPLE_SRC_DIR = $(SHARE_SRC)/sample/nio/chatserver
+SAMPLE_DST_DIR = $(SAMPLEDIR)/nio/chatserver
+
+SAMPLE_FILES = \
+ $(SAMPLE_DST_DIR)/ChatServer.java \
+ $(SAMPLE_DST_DIR)/Client.java \
+ $(SAMPLE_DST_DIR)/ClientReader.java \
+ $(SAMPLE_DST_DIR)/DataReader.java \
+ $(SAMPLE_DST_DIR)/MessageReader.java \
+ $(SAMPLE_DST_DIR)/NameReader.java \
+ $(SAMPLE_DST_DIR)/README.txt
+
+all build: $(SAMPLE_FILES)
+
+$(SAMPLE_DST_DIR)/%: $(SAMPLE_SRC_DIR)/%
+ $(install-file)
+
+clean clobber:
+ $(RM) -r $(SAMPLE_DST_DIR)
+
+.PHONY: all build clean clobber
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/share/sample/nio/chatserver/ChatServer.java Tue Apr 12 09:04:57 2011 +0200
@@ -0,0 +1,179 @@
+/*
+ * Copyright (c) 2011 Oracle and/or its affiliates. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * - Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * - Neither the name of Oracle nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
+ * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.StandardSocketOption;
+import java.nio.channels.*;
+import java.util.*;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Implements a chat server, this class holds the list of {@code clients} connected to the server.
+ * It sets up a server socket using AsynchronousServerSocketChannel listening to a specified port.
+ */
+public class ChatServer implements Runnable {
+ private final List<Client> connections = Collections.synchronizedList(new ArrayList<Client>());
+ private int port;
+ private final AsynchronousServerSocketChannel listener;
+ private final AsynchronousChannelGroup channelGroup;
+
+ /**
+ *
+ * @param port to listen to
+ * @throws java.io.IOException when failing to start the server
+ */
+ public ChatServer(int port) throws IOException {
+ channelGroup = AsynchronousChannelGroup.withFixedThreadPool(Runtime.getRuntime().availableProcessors(),
+ Executors.defaultThreadFactory());
+ this.port = port;
+ listener = createListener(channelGroup);
+ }
+
+ /**
+ *
+ * @return The socket address that the server is bound to
+ * @throws java.io.IOException if an I/O error occurs
+ */
+ public SocketAddress getSocketAddress() throws IOException {
+ return listener.getLocalAddress();
+ }
+
+ /**
+ * Start accepting connections
+ */
+ public void run() {
+
+ // call accept to wait for connections, tell it to call our CompletionHandler when there
+ // is a new incoming connection
+ listener.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
+ @Override
+ public void completed(AsynchronousSocketChannel result, Void attachment) {
+ // request a new accept and handle the incoming connection
+ listener.accept(null, this);
+ handleNewConnection(result);
+ }
+
+ @Override
+ public void failed(Throwable exc, Void attachment) {
+ }
+ });
+ }
+
+ /**
+ * Shuts down the server
+ * @throws InterruptedException if terminated while waiting for shutdown
+ * @throws IOException if failing to shutdown the channel group
+ */
+ public void shutdown() throws InterruptedException, IOException {
+ channelGroup.shutdownNow();
+ channelGroup.awaitTermination(1, TimeUnit.SECONDS);
+ }
+
+ /*
+ * Creates a listener and starts accepting connections
+ */
+ private AsynchronousServerSocketChannel createListener(AsynchronousChannelGroup channelGroup) throws IOException {
+ final AsynchronousServerSocketChannel listener = openChannel(channelGroup);
+ listener.setOption(StandardSocketOption.SO_REUSEADDR, true);
+ listener.bind(new InetSocketAddress(port));
+ return listener;
+ }
+
+ private AsynchronousServerSocketChannel openChannel(AsynchronousChannelGroup channelGroup) throws IOException {
+ return AsynchronousServerSocketChannel.open(channelGroup);
+ }
+
+ /**
+ * Creates a new client and adds it to the list of connections.
+ * Sets the clients handler to the initial state of NameReader
+ *
+ * @param channel the newly accepted channel
+ */
+ private void handleNewConnection(AsynchronousSocketChannel channel) {
+ Client client = new Client(channel, new ClientReader(this, new NameReader(this)));
+ try {
+ channel.setOption(StandardSocketOption.TCP_NODELAY, true);
+ } catch (IOException e) {
+ // ignore
+ }
+ connections.add(client);
+ client.run();
+ }
+
+ /**
+ * Sends a message to all clients except the source.
+ * The method is synchronized as it is desired that messages are sent to
+ * all clients in the same order as received.
+ *
+ * @param client the message source
+ * @param message the message to be sent
+ */
+ public void writeMessageToClients(Client client, String message) {
+ synchronized (connections) {
+ for (Client clientConnection : connections) {
+ if (clientConnection != client) {
+ clientConnection.writeMessageFrom(client, message);
+ }
+ }
+ }
+ }
+
+ public void removeClient(Client client) {
+ connections.remove(client);
+ }
+
+ private static void usage() {
+ System.err.println("ChatServer [-port <port number>]");
+ System.exit(1);
+ }
+
+ public static void main(String[] args) throws IOException {
+ int port = 5000;
+ if (args.length != 0 && args.length != 2) {
+ usage();
+ } else if (args.length == 2) {
+ try {
+ if (args[0].equals("-port")) {
+ port = Integer.parseInt(args[1]);
+ } else {
+ usage();
+ }
+ } catch (NumberFormatException e) {
+ usage();
+ }
+ }
+ System.out.println("Running on port " + port);
+ new ChatServer(port).run();
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/share/sample/nio/chatserver/Client.java Tue Apr 12 09:04:57 2011 +0200
@@ -0,0 +1,204 @@
+/*
+ * Copyright (c) 2011 Oracle and/or its affiliates. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * - Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * - Neither the name of Oracle nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
+ * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousSocketChannel;
+import java.nio.channels.CompletionHandler;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Client represents a remote connection to the chat server.
+ * It contains methods for reading and writing messages from the
+ * channel.
+ * Messages are considered to be separated by newline, so incomplete
+ * messages are buffered in the {@code Client}.
+ *
+ * All reads and writes are asynchronous and uses the nio2 asynchronous
+ * elements.
+ */
+class Client {
+ private final AsynchronousSocketChannel channel;
+ private AtomicReference<ClientReader> reader;
+ private String userName;
+ private final StringBuilder messageBuffer = new StringBuilder();
+
+ private final Queue<ByteBuffer> queue = new LinkedList<ByteBuffer>();
+ private boolean writing = false;
+
+ public Client(AsynchronousSocketChannel channel, ClientReader reader) {
+ this.channel = channel;
+ this.reader = new AtomicReference<ClientReader>(reader);
+ }
+
+ /**
+ * Enqueues a write of the buffer to the channel.
+ * The call is asynchronous so the buffer is not safe to modify after
+ * passing the buffer here.
+ *
+ * @param buffer the buffer to send to the channel
+ */
+ private void writeMessage(final ByteBuffer buffer) {
+ boolean threadShouldWrite = false;
+
+ synchronized(queue) {
+ queue.add(buffer);
+ // Currently no thread writing, make this thread dispatch a write
+ if (!writing) {
+ writing = true;
+ threadShouldWrite = true;
+ }
+ }
+
+ if (threadShouldWrite) {
+ writeFromQueue();
+ }
+ }
+
+ private void writeFromQueue() {
+ ByteBuffer buffer;
+
+ synchronized (queue) {
+ buffer = queue.poll();
+ if (buffer == null) {
+ writing = false;
+ }
+ }
+
+ // No new data in buffer to write
+ if (writing) {
+ writeBuffer(buffer);
+ }
+ }
+
+ private void writeBuffer(ByteBuffer buffer) {
+ channel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
+ @Override
+ public void completed(Integer result, ByteBuffer buffer) {
+ if (buffer.hasRemaining()) {
+ channel.write(buffer, buffer, this);
+ } else {
+ // Go back and check if there is new data to write
+ writeFromQueue();
+ }
+ }
+
+ @Override
+ public void failed(Throwable exc, ByteBuffer attachment) {
+ }
+ });
+ }
+
+ /**
+ * Sends a message
+ * @param string the message
+ */
+ public void writeStringMessage(String string) {
+ writeMessage(ByteBuffer.wrap(string.getBytes()));
+ }
+
+ /**
+ * Send a message from a specific client
+ * @param client the message is sent from
+ * @param message to send
+ */
+ public void writeMessageFrom(Client client, String message) {
+ if (reader.get().acceptsMessages()) {
+ writeStringMessage(client.getUserName() + ": " + message);
+ }
+ }
+
+ /**
+ * Enqueue a read
+ * @param completionHandler callback on completed read
+ */
+ public void read(CompletionHandler<Integer, ? super ByteBuffer> completionHandler) {
+ ByteBuffer input = ByteBuffer.allocate(256);
+ if (!channel.isOpen()) {
+ return;
+ }
+ channel.read(input, input, completionHandler);
+ }
+
+ /**
+ * Closes the channel
+ */
+ public void close() {
+ try {
+ channel.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Run the current states actions.
+ */
+ public void run() {
+ reader.get().run(this);
+ }
+
+ public void setUserName(String userName) {
+ this.userName = userName;
+ }
+
+ public void setReader(ClientReader reader) {
+ this.reader.set(reader);
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public void appendMessage(String message) {
+ synchronized (messageBuffer) {
+ messageBuffer.append(message);
+ }
+ }
+
+ /**
+ * @return the next newline separated message in the buffer. null is returned if the buffer
+ * doesn't contain any newline.
+ */
+ public String nextMessage() {
+ synchronized(messageBuffer) {
+ int nextNewline = messageBuffer.indexOf("\n");
+ if (nextNewline == -1) {
+ return null;
+ }
+ String message = messageBuffer.substring(0, nextNewline + 1);
+ messageBuffer.delete(0, nextNewline + 1);
+ return message;
+ }
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/share/sample/nio/chatserver/ClientReader.java Tue Apr 12 09:04:57 2011 +0200
@@ -0,0 +1,80 @@
+/*
+ * Copyright (c) 2011 Oracle and/or its affiliates. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * - Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * - Neither the name of Oracle nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
+ * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+import java.nio.ByteBuffer;
+import java.nio.channels.CompletionHandler;
+
+/**
+ * Handles a cycle of reading / writing on the {@code Client}.
+ */
+class ClientReader {
+ private final DataReader callback;
+ private final ChatServer chatServer;
+
+ ClientReader(ChatServer chatServer, DataReader callback) {
+ this.chatServer = chatServer;
+ this.callback = callback;
+ }
+
+ public boolean acceptsMessages() {
+ return callback.acceptsMessages();
+ }
+
+ /**
+ * Runs a cycle of doing a beforeRead action and then enqueing a new
+ * read on the client. Handles closed channels and errors while reading.
+ * If the client is still connected a new round of actions are called.
+ */
+ public void run(final Client client) {
+ callback.beforeRead(client);
+ client.read(new CompletionHandler<Integer, ByteBuffer>() {
+ @Override
+ public void completed(Integer result, ByteBuffer buffer) {
+ // if result is negative or zero the connection has been closed or something gone wrong
+ if (result < 1) {
+ client.close();
+ System.out.println("Closing connection to " + client);
+ chatServer.removeClient(client);
+ } else {
+ callback.onData(client, buffer, result);
+ // enqueue next round of actions
+ client.run();
+ }
+ }
+
+ @Override
+ public void failed(Throwable exc, ByteBuffer buffer) {
+ client.close();
+ chatServer.removeClient(client);
+ }
+ });
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/share/sample/nio/chatserver/DataReader.java Tue Apr 12 09:04:57 2011 +0200
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2011 Oracle and/or its affiliates. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * - Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * - Neither the name of Oracle nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
+ * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+import java.nio.ByteBuffer;
+
+public interface DataReader {
+ void beforeRead(Client client);
+ void onData(Client client, ByteBuffer buffer, int bytes);
+ boolean acceptsMessages();
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/share/sample/nio/chatserver/MessageReader.java Tue Apr 12 09:04:57 2011 +0200
@@ -0,0 +1,77 @@
+/*
+ * Copyright (c) 2011 Oracle and/or its affiliates. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * - Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * - Neither the name of Oracle nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
+ * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+import java.nio.ByteBuffer;
+
+/**
+ * Writes all messages in our buffer to the other clients
+ * and appends new data read from the socket to our buffer
+ */
+class MessageReader implements DataReader {
+ private final ChatServer chatServer;
+
+ public MessageReader(ChatServer chatServer) {
+ this.chatServer = chatServer;
+ }
+
+ public boolean acceptsMessages() {
+ return true;
+ }
+
+ /**
+ * Write all full messages in our buffer to
+ * the other clients
+ *
+ * @param client the client to read messages from
+ */
+ @Override
+ public void beforeRead(Client client) {
+ // Check if we have any messages buffered and send them
+ String message = client.nextMessage();
+ while (message != null) {
+ chatServer.writeMessageToClients(client, message);
+ message = client.nextMessage();
+ }
+ }
+
+ /**
+ * Append the read buffer to the clients message buffer
+ * @param client the client to append messages to
+ * @param buffer the buffer we received from the socket
+ * @param bytes the number of bytes read into the buffer
+ */
+ @Override
+ public void onData(Client client, ByteBuffer buffer, int bytes) {
+ buffer.flip();
+ // Just append the message on the buffer
+ client.appendMessage(new String(buffer.array(), 0, bytes));
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/share/sample/nio/chatserver/NameReader.java Tue Apr 12 09:04:57 2011 +0200
@@ -0,0 +1,111 @@
+/*
+ * Copyright (c) 2011 Oracle and/or its affiliates. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * - Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * - Neither the name of Oracle nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
+ * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+import java.nio.ByteBuffer;
+
+/**
+ * The first state a newly connected {@code Client} is in, this
+ * handles writing out the welcoming message and reads the response
+ * up to a newline. When a newline character have been received
+ * it changes the handler from NameReader to MessageReader on the
+ * client.
+ */
+class NameReader implements DataReader {
+ private final StringBuilder buffer = new StringBuilder();
+ private final ChatServer chatServer;
+ private boolean once = true;
+ private static final String NEWLINE = "\n";
+
+ public NameReader(ChatServer chatServer) {
+ this.chatServer = chatServer;
+ }
+
+ /**
+ * Writes the welcoming message to the client the first time this method
+ * is called.
+ *
+ * @param client the client to receive the message
+ */
+ @Override
+ public void beforeRead(Client client) {
+ // if it is a long name that takes more than one read we only want to display Name: once.
+ if (once) {
+ client.writeStringMessage("Name: ");
+ once = false;
+ }
+ }
+
+ public boolean acceptsMessages() {
+ return false;
+ }
+
+ /**
+ * Receives incoming data from the socket, searches for a newline
+ * and tries to set the username if one is found
+ */
+ @Override
+ public void onData(Client client, ByteBuffer buffer, int bytes) {
+ buffer.flip();
+ String name;
+ name = this.buffer.append(new String(buffer.array(), 0, bytes)).toString();
+ if (name.contains(NEWLINE)) {
+ onUserNameRead(client, name);
+ }
+ }
+
+ /**
+ * Splits the name on the newlines, takes the first as the username
+ * and appends everything else to the clients message buffer.
+ * Sets the clients handler to MessageReader.
+ *
+ * @param client the client to set the username for
+ * @param name the string containing the buffered input
+ */
+ private void onUserNameRead(Client client, String name) {
+ String[] strings = name.split(NEWLINE, 2);
+ client.setUserName(strings[0].trim());
+ sendRemainingParts(client, strings);
+ client.setReader(new ClientReader(chatServer, new MessageReader(chatServer)));
+ client.writeStringMessage("Welcome " + client.getUserName() + "\n");
+ }
+
+ /**
+ * Appends the remaining parts to the clients message buffer
+ *
+ * @param client the client
+ * @param strings the messages to append to the buffer
+ */
+ private void sendRemainingParts(Client client, String[] strings) {
+ for (int i = 1; i < strings.length; ++i) {
+ client.appendMessage(strings[i]);
+ }
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/share/sample/nio/chatserver/README.txt Tue Apr 12 09:04:57 2011 +0200
@@ -0,0 +1,62 @@
+A Simple Chat Server Example
+
+INTRODUCTION
+============
+This directory contains a very simple chat server, the server takes input from a
+socket ("user") and sends it to all other connected sockets ("users") along with
+the provided name the user was asked for when first connecting.
+
+The server was written to demonstrate the asynchronous I/O API in JDK 7.
+The sample assumes the reader has some familiarity with the subject matter.
+
+SETUP
+=====
+
+The server must be built with version 7 (or later) of the JDK.
+The server is built with:
+
+ % mkdir build
+ % javac -source 7 -target 7 -d build *.java
+
+EXECUTION
+=========
+
+ % java -classpath build ChatServer [-port <port number>]
+
+ Usage: ChatServer [options]
+ options:
+ -port port port number
+ default: 5000
+
+CLIENT EXECUTION
+================
+
+No client binary is included in the sample.
+Connections can be made using for example the telnet command or any program
+that supports a raw TCP connection to a port.
+
+SOURCE CODE OVERVIEW
+====================
+ChatServer is the main class, it handles the startup and handles incoming
+connections on the listening sockets. It keeps a list of connected client
+and provides methods for sending a message to them.
+
+Client represents a connected user, it provides methods for reading/writing
+from/to the underlying socket. It also contains a buffer of input read from
+the user.
+
+DataReader provides the interface of the two states a user can
+be in. Waiting for a name (and not receiving any messages while doing so, implemented
+by NameReader) and waiting for messages from the user (implemented by MessageReader).
+
+ClientReader contains the "main loop" for a connected client.
+
+NameReader is the initial state for a new client, it sends the user a string and
+waits for a response before changing the state to MessageReader.
+
+MessageReader is the main state for a client, it checks for new messages to send to
+other clients and reads messages from the client.
+
+FINALLY
+=======
+This is a sample: it is not production quality and isn't optimized for performance.
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/sample/chatserver/ChatTest.java Tue Apr 12 09:04:57 2011 +0200
@@ -0,0 +1,399 @@
+/*
+ * Copyright (c) 2011 Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+
+/* @test
+ * @summary Test chat server chatserver test
+ *
+ * @library ../../../src/share/sample/nio/chatserver
+ * @build ChatTest ChatServer Client ClientReader DataReader MessageReader NameReader
+ * @run main ChatTest
+ */
+
+import java.io.*;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CyclicBarrier;
+
+public class ChatTest {
+ public static int listeningPort = 0;
+
+ public static void main(String[] args) throws Throwable {
+ testStartStop();
+ testPortOpen();
+ testAsksForName();
+ testUseName();
+ testConnectDisconnectConnect();
+ testUsernameAndMessage();
+ testDontReceiveMessageInNameState();
+ }
+
+ private static ChatServer startServer() throws IOException {
+ ChatServer server = new ChatServer(0);
+ InetSocketAddress address = (InetSocketAddress) server.getSocketAddress();
+ listeningPort = address.getPort();
+ server.run();
+ return server;
+ }
+
+ public static void testStartStop() throws Exception {
+ ChatServer server = startServer();
+ server.shutdown();
+ }
+
+ public static void testPortOpen() throws Exception {
+ ChatServer server = startServer();
+ try {
+ Socket socket = new Socket("localhost", listeningPort);
+ if (!socket.isConnected()) {
+ throw new RuntimeException("Failed to connect to server: port not open");
+ }
+ } finally {
+ server.shutdown();
+ }
+ }
+
+ public static void testAsksForName() throws Exception {
+ ChatServer server = startServer();
+ try {
+ Socket socket = new Socket("localhost", listeningPort);
+
+ Reader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+ String string = readAvailableString(reader);
+ if (!string.equals("Name: ")) {
+ throw new RuntimeException("Server doesn't send Name: ");
+ }
+ } finally {
+ server.shutdown();
+ }
+ }
+
+ public static void testUseName() throws Throwable {
+ ChatServer server = startServer();
+ try {
+ performTestUseName();
+ } finally {
+ server.shutdown();
+ }
+ }
+
+ public static void testConnectDisconnectConnect() throws Exception {
+ ChatServer server = startServer();
+ try {
+ performTestConnectDisconnectConnect();
+ } finally {
+ server.shutdown();
+ }
+ }
+
+ public static void testUsernameAndMessage() throws Exception {
+ ChatServer server = startServer();
+ try {
+ performTestUsernameAndMessage();
+ } finally {
+ server.shutdown();
+ }
+ }
+
+ public static void testDontReceiveMessageInNameState() throws Exception {
+ ChatServer server = startServer();
+ try {
+ performDontReceiveMessageInNameState();
+ } finally {
+ server.shutdown();
+ }
+ }
+
+ private static void assertEqual(List<Exception> exception, Object value, Object expected) {
+ if (expected == value) {
+ return;
+ }
+ if (expected == null) {
+ exception.add(new RuntimeException("Expected null, but was: " + value));
+ return;
+ }
+ if (!expected.equals(value)) {
+ exception.add(new RuntimeException("Expected: " + expected + " but was: " + value));
+ return;
+ }
+ }
+
+ private static void performDontReceiveMessageInNameState() throws Exception {
+ final CyclicBarrier barrier1 = new CyclicBarrier(2);
+ final CyclicBarrier barrier2 = new CyclicBarrier(2);
+ final CyclicBarrier barrier3 = new CyclicBarrier(2);
+ final List<Exception> exceptions = Collections.synchronizedList(new ArrayList<Exception>());
+
+ ChatConnection chatConnection = new ChatConnection() {
+ @Override
+ public void run(Socket socket, BufferedReader reader, Writer writer) throws Exception {
+ String string = readAvailableString(reader);
+ assertEqual(exceptions, string, "Name: ");
+ writer.write("testClient1\n");
+ waitForJoin(reader, "testClient1");
+ barrier1.await();
+ writer.write("Ignore this!\n");
+ barrier2.await();
+ barrier3.await();
+ }
+ };
+
+ Thread client2 = new Thread(new ChatConnection() {
+ @Override
+ public void run(Socket socket, BufferedReader reader, Writer writer) throws Exception {
+ barrier1.await();
+ barrier2.await();
+ String string = readAvailableString(reader);
+ assertEqual(exceptions, string, "Name: ");
+ string = readAvailableString(reader, true);
+ assertEqual(exceptions, string, null);
+ writer.write("testClient2\n");
+ barrier3.await();
+ }
+ });
+
+ client2.start();
+ chatConnection.run();
+ if (!exceptions.isEmpty()) {
+ throw exceptions.get(0);
+ }
+
+ }
+
+ private static void waitForJoin(BufferedReader reader, String s) throws IOException {
+ String joined;
+ do {
+ joined = readAvailableString(reader);
+ } while (!(joined != null && joined.contains("Welcome " + s)));
+ }
+
+ private static void performTestUsernameAndMessage() throws Exception {
+ final CyclicBarrier barrier1 = new CyclicBarrier(2);
+ final CyclicBarrier barrier2 = new CyclicBarrier(2);
+ final CyclicBarrier barrier3 = new CyclicBarrier(2);
+ final List<Exception> exceptions = Collections.synchronizedList(new ArrayList<Exception>());
+
+ ChatConnection chatConnection = new ChatConnection() {
+ @Override
+ public void run(Socket socket, BufferedReader reader, Writer writer) throws Exception {
+ String string = readAvailableString(reader);
+ assertEqual(exceptions, string, "Name: ");
+ writer.write("testClient1\n");
+ waitForJoin(reader, "testClient1");
+ barrier1.await();
+ barrier2.await();
+ string = readAvailableString(reader);
+ assertEqual(exceptions, string, "testClient2: Hello world!\n");
+ barrier3.await();
+ }
+ };
+
+ Thread client2 = new Thread(new ChatConnection() {
+ @Override
+ public void run(Socket socket, BufferedReader reader, Writer writer) throws Exception {
+ String string = readAvailableString(reader);
+ assertEqual(exceptions, string, "Name: ");
+ barrier1.await();
+ writer.write("testClient2\nHello world!\n");
+ barrier2.await();
+ barrier3.await();
+ }
+ });
+
+ client2.start();
+ chatConnection.run();
+ if (!exceptions.isEmpty()) {
+ throw exceptions.get(0);
+ }
+ }
+
+ private static void performTestConnectDisconnectConnect() throws Exception {
+ final CyclicBarrier barrier1 = new CyclicBarrier(2);
+ final CyclicBarrier barrier2 = new CyclicBarrier(2);
+ final CyclicBarrier barrier3 = new CyclicBarrier(2);
+ final List<Exception> exceptions = new ArrayList<Exception>();
+
+ ChatConnection chatConnection = new ChatConnection() {
+ @Override
+ public void run(Socket socket, BufferedReader reader, Writer writer) throws Exception {
+ String string = readAvailableString(reader);
+ assertEqual(exceptions, string, "Name: ");
+ writer.write("testClient1\n");
+ }
+ };
+
+ ChatConnection chatConnection2 = new ChatConnection() {
+ @Override
+ public void run(Socket socket, BufferedReader reader, Writer writer) throws Exception {
+ readAvailableString(reader);
+ writer.write("testClient1\n");
+ waitForJoin(reader, "testClient1");
+ barrier1.await();
+ writer.write("Good morning!\n");
+ barrier2.await();
+ String string = readAvailableString(reader);
+ assertEqual(exceptions, string, "testClient2: Hello world!\n");
+ barrier3.await();
+ }
+ };
+
+ Thread client2 = new Thread(new ChatConnection() {
+ @Override
+ public void run(Socket socket, BufferedReader reader, Writer writer) throws Exception {
+ readAvailableString(reader);
+ writer.write("testClient2\n");
+ waitForJoin(reader, "testClient2");
+ barrier1.await();
+ writer.write("Hello world!\n");
+ barrier2.await();
+ String string = readAvailableString(reader);
+ assertEqual(exceptions, string, "testClient1: Good morning!\n");
+ barrier3.await();
+ }
+ });
+
+ client2.start();
+ chatConnection.run();
+ chatConnection2.run();
+ if (!exceptions.isEmpty()) {
+ throw exceptions.get(0);
+ }
+ }
+
+ private static void performTestUseName() throws Exception {
+ final CyclicBarrier barrier1 = new CyclicBarrier(2);
+ final CyclicBarrier barrier2 = new CyclicBarrier(2);
+ final CyclicBarrier barrier3 = new CyclicBarrier(2);
+ final List<Exception> exceptions = new ArrayList<Exception>();
+
+ ChatConnection chatConnection = new ChatConnection() {
+ @Override
+ public void run(Socket socket, BufferedReader reader, Writer writer) throws Exception {
+ String string = readAvailableString(reader);
+ if (!"Name: ".equals(string)) {
+ exceptions.add(new RuntimeException("Expected Name: "));
+ }
+ writer.write("testClient1\n");
+ waitForJoin(reader, "testClient1");
+ barrier1.await();
+ barrier2.await();
+ string = readAvailableString(reader);
+ if (!"testClient2: Hello world!\n".equals(string)) {
+ exceptions.add(new RuntimeException("testClient2: Hello world!\n"));
+ }
+ barrier3.await();
+ }
+ };
+
+ Thread client2 = new Thread(new ChatConnection() {
+ @Override
+ public void run(Socket socket, BufferedReader reader, Writer writer) throws Exception {
+ String string = readAvailableString(reader);
+ if (!"Name: ".equals(string)) {
+ exceptions.add(new RuntimeException("Expected Name: "));
+ }
+ writer.write("testClient2\n");
+ waitForJoin(reader, "testClient2");
+ barrier1.await();
+ writer.write("Hello world!\n");
+ barrier2.await();
+ barrier3.await();
+ }
+ });
+
+ client2.start();
+ chatConnection.run();
+ if (!exceptions.isEmpty()) {
+ throw exceptions.get(0);
+ }
+ }
+
+ private static String readAvailableString(Reader reader) throws IOException {
+ return readAvailableString(reader, false);
+ }
+
+ private static String readAvailableString(Reader reader, boolean now) throws IOException {
+ StringBuilder builder = new StringBuilder();
+ int bytes;
+ if (now && !reader.ready()) {
+ return null;
+ }
+ do {
+ char[] buf = new char[256];
+ bytes = reader.read(buf);
+ builder.append(buf, 0, bytes);
+ } while (bytes == 256);
+ return builder.toString();
+ }
+
+ private abstract static class ChatConnection implements Runnable {
+ public Exception exception;
+
+ @Override
+ public void run() {
+ try (Socket socket = new Socket("localhost", listeningPort);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+ Writer writer = new FlushingWriter(new OutputStreamWriter(socket.getOutputStream()))) {
+ socket.setTcpNoDelay(true);
+
+ run(socket, reader, writer);
+ } catch (Exception e) {
+ exception = e;
+ }
+ }
+
+ public abstract void run(Socket socket, BufferedReader reader, Writer writer) throws Exception;
+ }
+
+ private static class FlushingWriter extends Writer {
+ public final Writer delegate;
+
+ private FlushingWriter(Writer delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void write(char[] cbuf, int off, int len) throws IOException {
+ delegate.write(cbuf, off, len);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ delegate.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ delegate.close();
+ }
+
+ @Override
+ public void write(String str) throws IOException {
+ super.write(str);
+ flush();
+ }
+ }
+}