# HG changeset patch # User rbackman # Date 1302591897 -7200 # Node ID 142881732d3cc25703cbb35ee5f48023585cfad1 # Parent 12f80cd905053174850327ca540acd52d7afe5ea 7026287: Asynchronous API sample Summary: Implement a chat server using the new asynchronous networking API Reviewed-by: hosterda, alanb diff -r 12f80cd90505 -r 142881732d3c jdk/make/mksample/nio/Makefile --- a/jdk/make/mksample/nio/Makefile Mon Apr 11 23:20:41 2011 -0700 +++ b/jdk/make/mksample/nio/Makefile Tue Apr 12 09:04:57 2011 +0200 @@ -1,5 +1,5 @@ # -# Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2004, 2011, Oracle and/or its affiliates. All rights reserved. # DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. # # This code is free software; you can redistribute it and/or modify it @@ -31,7 +31,7 @@ PRODUCT = java include $(BUILDDIR)/common/Defs.gmk -SUBDIRS = file multicast server +SUBDIRS = chatserver file multicast server include $(BUILDDIR)/common/Subdirs.gmk all build clean clobber:: diff -r 12f80cd90505 -r 142881732d3c jdk/make/mksample/nio/chatserver/Makefile --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/jdk/make/mksample/nio/chatserver/Makefile Tue Apr 12 09:04:57 2011 +0200 @@ -0,0 +1,56 @@ +# +# Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved. +# DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. +# +# This code is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License version 2 only, as +# published by the Free Software Foundation. Oracle designates this +# particular file as subject to the "Classpath" exception as provided +# by Oracle in the LICENSE file that accompanied this code. +# +# This code is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +# version 2 for more details (a copy is included in the LICENSE file that +# accompanied this code). +# +# You should have received a copy of the GNU General Public License version +# 2 along with this work; if not, write to the Free Software Foundation, +# Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. +# +# Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA +# or visit www.oracle.com if you need additional information or have any +# questions. +# + +# +# Makefile for the nio/chatserver sample code +# + +BUILDDIR = ../../.. + +PRODUCT = java + +include $(BUILDDIR)/common/Defs.gmk + +SAMPLE_SRC_DIR = $(SHARE_SRC)/sample/nio/chatserver +SAMPLE_DST_DIR = $(SAMPLEDIR)/nio/chatserver + +SAMPLE_FILES = \ + $(SAMPLE_DST_DIR)/ChatServer.java \ + $(SAMPLE_DST_DIR)/Client.java \ + $(SAMPLE_DST_DIR)/ClientReader.java \ + $(SAMPLE_DST_DIR)/DataReader.java \ + $(SAMPLE_DST_DIR)/MessageReader.java \ + $(SAMPLE_DST_DIR)/NameReader.java \ + $(SAMPLE_DST_DIR)/README.txt + +all build: $(SAMPLE_FILES) + +$(SAMPLE_DST_DIR)/%: $(SAMPLE_SRC_DIR)/% + $(install-file) + +clean clobber: + $(RM) -r $(SAMPLE_DST_DIR) + +.PHONY: all build clean clobber diff -r 12f80cd90505 -r 142881732d3c jdk/src/share/sample/nio/chatserver/ChatServer.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/jdk/src/share/sample/nio/chatserver/ChatServer.java Tue Apr 12 09:04:57 2011 +0200 @@ -0,0 +1,179 @@ +/* + * Copyright (c) 2011 Oracle and/or its affiliates. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * - Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * - Neither the name of Oracle nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS + * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.StandardSocketOption; +import java.nio.channels.*; +import java.util.*; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * Implements a chat server, this class holds the list of {@code clients} connected to the server. + * It sets up a server socket using AsynchronousServerSocketChannel listening to a specified port. + */ +public class ChatServer implements Runnable { + private final List connections = Collections.synchronizedList(new ArrayList()); + private int port; + private final AsynchronousServerSocketChannel listener; + private final AsynchronousChannelGroup channelGroup; + + /** + * + * @param port to listen to + * @throws java.io.IOException when failing to start the server + */ + public ChatServer(int port) throws IOException { + channelGroup = AsynchronousChannelGroup.withFixedThreadPool(Runtime.getRuntime().availableProcessors(), + Executors.defaultThreadFactory()); + this.port = port; + listener = createListener(channelGroup); + } + + /** + * + * @return The socket address that the server is bound to + * @throws java.io.IOException if an I/O error occurs + */ + public SocketAddress getSocketAddress() throws IOException { + return listener.getLocalAddress(); + } + + /** + * Start accepting connections + */ + public void run() { + + // call accept to wait for connections, tell it to call our CompletionHandler when there + // is a new incoming connection + listener.accept(null, new CompletionHandler() { + @Override + public void completed(AsynchronousSocketChannel result, Void attachment) { + // request a new accept and handle the incoming connection + listener.accept(null, this); + handleNewConnection(result); + } + + @Override + public void failed(Throwable exc, Void attachment) { + } + }); + } + + /** + * Shuts down the server + * @throws InterruptedException if terminated while waiting for shutdown + * @throws IOException if failing to shutdown the channel group + */ + public void shutdown() throws InterruptedException, IOException { + channelGroup.shutdownNow(); + channelGroup.awaitTermination(1, TimeUnit.SECONDS); + } + + /* + * Creates a listener and starts accepting connections + */ + private AsynchronousServerSocketChannel createListener(AsynchronousChannelGroup channelGroup) throws IOException { + final AsynchronousServerSocketChannel listener = openChannel(channelGroup); + listener.setOption(StandardSocketOption.SO_REUSEADDR, true); + listener.bind(new InetSocketAddress(port)); + return listener; + } + + private AsynchronousServerSocketChannel openChannel(AsynchronousChannelGroup channelGroup) throws IOException { + return AsynchronousServerSocketChannel.open(channelGroup); + } + + /** + * Creates a new client and adds it to the list of connections. + * Sets the clients handler to the initial state of NameReader + * + * @param channel the newly accepted channel + */ + private void handleNewConnection(AsynchronousSocketChannel channel) { + Client client = new Client(channel, new ClientReader(this, new NameReader(this))); + try { + channel.setOption(StandardSocketOption.TCP_NODELAY, true); + } catch (IOException e) { + // ignore + } + connections.add(client); + client.run(); + } + + /** + * Sends a message to all clients except the source. + * The method is synchronized as it is desired that messages are sent to + * all clients in the same order as received. + * + * @param client the message source + * @param message the message to be sent + */ + public void writeMessageToClients(Client client, String message) { + synchronized (connections) { + for (Client clientConnection : connections) { + if (clientConnection != client) { + clientConnection.writeMessageFrom(client, message); + } + } + } + } + + public void removeClient(Client client) { + connections.remove(client); + } + + private static void usage() { + System.err.println("ChatServer [-port ]"); + System.exit(1); + } + + public static void main(String[] args) throws IOException { + int port = 5000; + if (args.length != 0 && args.length != 2) { + usage(); + } else if (args.length == 2) { + try { + if (args[0].equals("-port")) { + port = Integer.parseInt(args[1]); + } else { + usage(); + } + } catch (NumberFormatException e) { + usage(); + } + } + System.out.println("Running on port " + port); + new ChatServer(port).run(); + } +} diff -r 12f80cd90505 -r 142881732d3c jdk/src/share/sample/nio/chatserver/Client.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/jdk/src/share/sample/nio/chatserver/Client.java Tue Apr 12 09:04:57 2011 +0200 @@ -0,0 +1,204 @@ +/* + * Copyright (c) 2011 Oracle and/or its affiliates. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * - Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * - Neither the name of Oracle nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS + * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.CompletionHandler; +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Client represents a remote connection to the chat server. + * It contains methods for reading and writing messages from the + * channel. + * Messages are considered to be separated by newline, so incomplete + * messages are buffered in the {@code Client}. + * + * All reads and writes are asynchronous and uses the nio2 asynchronous + * elements. + */ +class Client { + private final AsynchronousSocketChannel channel; + private AtomicReference reader; + private String userName; + private final StringBuilder messageBuffer = new StringBuilder(); + + private final Queue queue = new LinkedList(); + private boolean writing = false; + + public Client(AsynchronousSocketChannel channel, ClientReader reader) { + this.channel = channel; + this.reader = new AtomicReference(reader); + } + + /** + * Enqueues a write of the buffer to the channel. + * The call is asynchronous so the buffer is not safe to modify after + * passing the buffer here. + * + * @param buffer the buffer to send to the channel + */ + private void writeMessage(final ByteBuffer buffer) { + boolean threadShouldWrite = false; + + synchronized(queue) { + queue.add(buffer); + // Currently no thread writing, make this thread dispatch a write + if (!writing) { + writing = true; + threadShouldWrite = true; + } + } + + if (threadShouldWrite) { + writeFromQueue(); + } + } + + private void writeFromQueue() { + ByteBuffer buffer; + + synchronized (queue) { + buffer = queue.poll(); + if (buffer == null) { + writing = false; + } + } + + // No new data in buffer to write + if (writing) { + writeBuffer(buffer); + } + } + + private void writeBuffer(ByteBuffer buffer) { + channel.write(buffer, buffer, new CompletionHandler() { + @Override + public void completed(Integer result, ByteBuffer buffer) { + if (buffer.hasRemaining()) { + channel.write(buffer, buffer, this); + } else { + // Go back and check if there is new data to write + writeFromQueue(); + } + } + + @Override + public void failed(Throwable exc, ByteBuffer attachment) { + } + }); + } + + /** + * Sends a message + * @param string the message + */ + public void writeStringMessage(String string) { + writeMessage(ByteBuffer.wrap(string.getBytes())); + } + + /** + * Send a message from a specific client + * @param client the message is sent from + * @param message to send + */ + public void writeMessageFrom(Client client, String message) { + if (reader.get().acceptsMessages()) { + writeStringMessage(client.getUserName() + ": " + message); + } + } + + /** + * Enqueue a read + * @param completionHandler callback on completed read + */ + public void read(CompletionHandler completionHandler) { + ByteBuffer input = ByteBuffer.allocate(256); + if (!channel.isOpen()) { + return; + } + channel.read(input, input, completionHandler); + } + + /** + * Closes the channel + */ + public void close() { + try { + channel.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * Run the current states actions. + */ + public void run() { + reader.get().run(this); + } + + public void setUserName(String userName) { + this.userName = userName; + } + + public void setReader(ClientReader reader) { + this.reader.set(reader); + } + + public String getUserName() { + return userName; + } + + public void appendMessage(String message) { + synchronized (messageBuffer) { + messageBuffer.append(message); + } + } + + /** + * @return the next newline separated message in the buffer. null is returned if the buffer + * doesn't contain any newline. + */ + public String nextMessage() { + synchronized(messageBuffer) { + int nextNewline = messageBuffer.indexOf("\n"); + if (nextNewline == -1) { + return null; + } + String message = messageBuffer.substring(0, nextNewline + 1); + messageBuffer.delete(0, nextNewline + 1); + return message; + } + } +} diff -r 12f80cd90505 -r 142881732d3c jdk/src/share/sample/nio/chatserver/ClientReader.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/jdk/src/share/sample/nio/chatserver/ClientReader.java Tue Apr 12 09:04:57 2011 +0200 @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2011 Oracle and/or its affiliates. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * - Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * - Neither the name of Oracle nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS + * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +import java.nio.ByteBuffer; +import java.nio.channels.CompletionHandler; + +/** + * Handles a cycle of reading / writing on the {@code Client}. + */ +class ClientReader { + private final DataReader callback; + private final ChatServer chatServer; + + ClientReader(ChatServer chatServer, DataReader callback) { + this.chatServer = chatServer; + this.callback = callback; + } + + public boolean acceptsMessages() { + return callback.acceptsMessages(); + } + + /** + * Runs a cycle of doing a beforeRead action and then enqueing a new + * read on the client. Handles closed channels and errors while reading. + * If the client is still connected a new round of actions are called. + */ + public void run(final Client client) { + callback.beforeRead(client); + client.read(new CompletionHandler() { + @Override + public void completed(Integer result, ByteBuffer buffer) { + // if result is negative or zero the connection has been closed or something gone wrong + if (result < 1) { + client.close(); + System.out.println("Closing connection to " + client); + chatServer.removeClient(client); + } else { + callback.onData(client, buffer, result); + // enqueue next round of actions + client.run(); + } + } + + @Override + public void failed(Throwable exc, ByteBuffer buffer) { + client.close(); + chatServer.removeClient(client); + } + }); + } +} diff -r 12f80cd90505 -r 142881732d3c jdk/src/share/sample/nio/chatserver/DataReader.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/jdk/src/share/sample/nio/chatserver/DataReader.java Tue Apr 12 09:04:57 2011 +0200 @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2011 Oracle and/or its affiliates. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * - Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * - Neither the name of Oracle nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS + * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +import java.nio.ByteBuffer; + +public interface DataReader { + void beforeRead(Client client); + void onData(Client client, ByteBuffer buffer, int bytes); + boolean acceptsMessages(); +} diff -r 12f80cd90505 -r 142881732d3c jdk/src/share/sample/nio/chatserver/MessageReader.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/jdk/src/share/sample/nio/chatserver/MessageReader.java Tue Apr 12 09:04:57 2011 +0200 @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2011 Oracle and/or its affiliates. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * - Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * - Neither the name of Oracle nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS + * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +import java.nio.ByteBuffer; + +/** + * Writes all messages in our buffer to the other clients + * and appends new data read from the socket to our buffer + */ +class MessageReader implements DataReader { + private final ChatServer chatServer; + + public MessageReader(ChatServer chatServer) { + this.chatServer = chatServer; + } + + public boolean acceptsMessages() { + return true; + } + + /** + * Write all full messages in our buffer to + * the other clients + * + * @param client the client to read messages from + */ + @Override + public void beforeRead(Client client) { + // Check if we have any messages buffered and send them + String message = client.nextMessage(); + while (message != null) { + chatServer.writeMessageToClients(client, message); + message = client.nextMessage(); + } + } + + /** + * Append the read buffer to the clients message buffer + * @param client the client to append messages to + * @param buffer the buffer we received from the socket + * @param bytes the number of bytes read into the buffer + */ + @Override + public void onData(Client client, ByteBuffer buffer, int bytes) { + buffer.flip(); + // Just append the message on the buffer + client.appendMessage(new String(buffer.array(), 0, bytes)); + } +} diff -r 12f80cd90505 -r 142881732d3c jdk/src/share/sample/nio/chatserver/NameReader.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/jdk/src/share/sample/nio/chatserver/NameReader.java Tue Apr 12 09:04:57 2011 +0200 @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2011 Oracle and/or its affiliates. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * - Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * - Neither the name of Oracle nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS + * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +import java.nio.ByteBuffer; + +/** + * The first state a newly connected {@code Client} is in, this + * handles writing out the welcoming message and reads the response + * up to a newline. When a newline character have been received + * it changes the handler from NameReader to MessageReader on the + * client. + */ +class NameReader implements DataReader { + private final StringBuilder buffer = new StringBuilder(); + private final ChatServer chatServer; + private boolean once = true; + private static final String NEWLINE = "\n"; + + public NameReader(ChatServer chatServer) { + this.chatServer = chatServer; + } + + /** + * Writes the welcoming message to the client the first time this method + * is called. + * + * @param client the client to receive the message + */ + @Override + public void beforeRead(Client client) { + // if it is a long name that takes more than one read we only want to display Name: once. + if (once) { + client.writeStringMessage("Name: "); + once = false; + } + } + + public boolean acceptsMessages() { + return false; + } + + /** + * Receives incoming data from the socket, searches for a newline + * and tries to set the username if one is found + */ + @Override + public void onData(Client client, ByteBuffer buffer, int bytes) { + buffer.flip(); + String name; + name = this.buffer.append(new String(buffer.array(), 0, bytes)).toString(); + if (name.contains(NEWLINE)) { + onUserNameRead(client, name); + } + } + + /** + * Splits the name on the newlines, takes the first as the username + * and appends everything else to the clients message buffer. + * Sets the clients handler to MessageReader. + * + * @param client the client to set the username for + * @param name the string containing the buffered input + */ + private void onUserNameRead(Client client, String name) { + String[] strings = name.split(NEWLINE, 2); + client.setUserName(strings[0].trim()); + sendRemainingParts(client, strings); + client.setReader(new ClientReader(chatServer, new MessageReader(chatServer))); + client.writeStringMessage("Welcome " + client.getUserName() + "\n"); + } + + /** + * Appends the remaining parts to the clients message buffer + * + * @param client the client + * @param strings the messages to append to the buffer + */ + private void sendRemainingParts(Client client, String[] strings) { + for (int i = 1; i < strings.length; ++i) { + client.appendMessage(strings[i]); + } + } +} diff -r 12f80cd90505 -r 142881732d3c jdk/src/share/sample/nio/chatserver/README.txt --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/jdk/src/share/sample/nio/chatserver/README.txt Tue Apr 12 09:04:57 2011 +0200 @@ -0,0 +1,62 @@ +A Simple Chat Server Example + +INTRODUCTION +============ +This directory contains a very simple chat server, the server takes input from a +socket ("user") and sends it to all other connected sockets ("users") along with +the provided name the user was asked for when first connecting. + +The server was written to demonstrate the asynchronous I/O API in JDK 7. +The sample assumes the reader has some familiarity with the subject matter. + +SETUP +===== + +The server must be built with version 7 (or later) of the JDK. +The server is built with: + + % mkdir build + % javac -source 7 -target 7 -d build *.java + +EXECUTION +========= + + % java -classpath build ChatServer [-port ] + + Usage: ChatServer [options] + options: + -port port port number + default: 5000 + +CLIENT EXECUTION +================ + +No client binary is included in the sample. +Connections can be made using for example the telnet command or any program +that supports a raw TCP connection to a port. + +SOURCE CODE OVERVIEW +==================== +ChatServer is the main class, it handles the startup and handles incoming +connections on the listening sockets. It keeps a list of connected client +and provides methods for sending a message to them. + +Client represents a connected user, it provides methods for reading/writing +from/to the underlying socket. It also contains a buffer of input read from +the user. + +DataReader provides the interface of the two states a user can +be in. Waiting for a name (and not receiving any messages while doing so, implemented +by NameReader) and waiting for messages from the user (implemented by MessageReader). + +ClientReader contains the "main loop" for a connected client. + +NameReader is the initial state for a new client, it sends the user a string and +waits for a response before changing the state to MessageReader. + +MessageReader is the main state for a client, it checks for new messages to send to +other clients and reads messages from the client. + +FINALLY +======= +This is a sample: it is not production quality and isn't optimized for performance. diff -r 12f80cd90505 -r 142881732d3c jdk/test/sample/chatserver/ChatTest.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/jdk/test/sample/chatserver/ChatTest.java Tue Apr 12 09:04:57 2011 +0200 @@ -0,0 +1,399 @@ +/* + * Copyright (c) 2011 Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + + +/* @test + * @summary Test chat server chatserver test + * + * @library ../../../src/share/sample/nio/chatserver + * @build ChatTest ChatServer Client ClientReader DataReader MessageReader NameReader + * @run main ChatTest + */ + +import java.io.*; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CyclicBarrier; + +public class ChatTest { + public static int listeningPort = 0; + + public static void main(String[] args) throws Throwable { + testStartStop(); + testPortOpen(); + testAsksForName(); + testUseName(); + testConnectDisconnectConnect(); + testUsernameAndMessage(); + testDontReceiveMessageInNameState(); + } + + private static ChatServer startServer() throws IOException { + ChatServer server = new ChatServer(0); + InetSocketAddress address = (InetSocketAddress) server.getSocketAddress(); + listeningPort = address.getPort(); + server.run(); + return server; + } + + public static void testStartStop() throws Exception { + ChatServer server = startServer(); + server.shutdown(); + } + + public static void testPortOpen() throws Exception { + ChatServer server = startServer(); + try { + Socket socket = new Socket("localhost", listeningPort); + if (!socket.isConnected()) { + throw new RuntimeException("Failed to connect to server: port not open"); + } + } finally { + server.shutdown(); + } + } + + public static void testAsksForName() throws Exception { + ChatServer server = startServer(); + try { + Socket socket = new Socket("localhost", listeningPort); + + Reader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); + String string = readAvailableString(reader); + if (!string.equals("Name: ")) { + throw new RuntimeException("Server doesn't send Name: "); + } + } finally { + server.shutdown(); + } + } + + public static void testUseName() throws Throwable { + ChatServer server = startServer(); + try { + performTestUseName(); + } finally { + server.shutdown(); + } + } + + public static void testConnectDisconnectConnect() throws Exception { + ChatServer server = startServer(); + try { + performTestConnectDisconnectConnect(); + } finally { + server.shutdown(); + } + } + + public static void testUsernameAndMessage() throws Exception { + ChatServer server = startServer(); + try { + performTestUsernameAndMessage(); + } finally { + server.shutdown(); + } + } + + public static void testDontReceiveMessageInNameState() throws Exception { + ChatServer server = startServer(); + try { + performDontReceiveMessageInNameState(); + } finally { + server.shutdown(); + } + } + + private static void assertEqual(List exception, Object value, Object expected) { + if (expected == value) { + return; + } + if (expected == null) { + exception.add(new RuntimeException("Expected null, but was: " + value)); + return; + } + if (!expected.equals(value)) { + exception.add(new RuntimeException("Expected: " + expected + " but was: " + value)); + return; + } + } + + private static void performDontReceiveMessageInNameState() throws Exception { + final CyclicBarrier barrier1 = new CyclicBarrier(2); + final CyclicBarrier barrier2 = new CyclicBarrier(2); + final CyclicBarrier barrier3 = new CyclicBarrier(2); + final List exceptions = Collections.synchronizedList(new ArrayList()); + + ChatConnection chatConnection = new ChatConnection() { + @Override + public void run(Socket socket, BufferedReader reader, Writer writer) throws Exception { + String string = readAvailableString(reader); + assertEqual(exceptions, string, "Name: "); + writer.write("testClient1\n"); + waitForJoin(reader, "testClient1"); + barrier1.await(); + writer.write("Ignore this!\n"); + barrier2.await(); + barrier3.await(); + } + }; + + Thread client2 = new Thread(new ChatConnection() { + @Override + public void run(Socket socket, BufferedReader reader, Writer writer) throws Exception { + barrier1.await(); + barrier2.await(); + String string = readAvailableString(reader); + assertEqual(exceptions, string, "Name: "); + string = readAvailableString(reader, true); + assertEqual(exceptions, string, null); + writer.write("testClient2\n"); + barrier3.await(); + } + }); + + client2.start(); + chatConnection.run(); + if (!exceptions.isEmpty()) { + throw exceptions.get(0); + } + + } + + private static void waitForJoin(BufferedReader reader, String s) throws IOException { + String joined; + do { + joined = readAvailableString(reader); + } while (!(joined != null && joined.contains("Welcome " + s))); + } + + private static void performTestUsernameAndMessage() throws Exception { + final CyclicBarrier barrier1 = new CyclicBarrier(2); + final CyclicBarrier barrier2 = new CyclicBarrier(2); + final CyclicBarrier barrier3 = new CyclicBarrier(2); + final List exceptions = Collections.synchronizedList(new ArrayList()); + + ChatConnection chatConnection = new ChatConnection() { + @Override + public void run(Socket socket, BufferedReader reader, Writer writer) throws Exception { + String string = readAvailableString(reader); + assertEqual(exceptions, string, "Name: "); + writer.write("testClient1\n"); + waitForJoin(reader, "testClient1"); + barrier1.await(); + barrier2.await(); + string = readAvailableString(reader); + assertEqual(exceptions, string, "testClient2: Hello world!\n"); + barrier3.await(); + } + }; + + Thread client2 = new Thread(new ChatConnection() { + @Override + public void run(Socket socket, BufferedReader reader, Writer writer) throws Exception { + String string = readAvailableString(reader); + assertEqual(exceptions, string, "Name: "); + barrier1.await(); + writer.write("testClient2\nHello world!\n"); + barrier2.await(); + barrier3.await(); + } + }); + + client2.start(); + chatConnection.run(); + if (!exceptions.isEmpty()) { + throw exceptions.get(0); + } + } + + private static void performTestConnectDisconnectConnect() throws Exception { + final CyclicBarrier barrier1 = new CyclicBarrier(2); + final CyclicBarrier barrier2 = new CyclicBarrier(2); + final CyclicBarrier barrier3 = new CyclicBarrier(2); + final List exceptions = new ArrayList(); + + ChatConnection chatConnection = new ChatConnection() { + @Override + public void run(Socket socket, BufferedReader reader, Writer writer) throws Exception { + String string = readAvailableString(reader); + assertEqual(exceptions, string, "Name: "); + writer.write("testClient1\n"); + } + }; + + ChatConnection chatConnection2 = new ChatConnection() { + @Override + public void run(Socket socket, BufferedReader reader, Writer writer) throws Exception { + readAvailableString(reader); + writer.write("testClient1\n"); + waitForJoin(reader, "testClient1"); + barrier1.await(); + writer.write("Good morning!\n"); + barrier2.await(); + String string = readAvailableString(reader); + assertEqual(exceptions, string, "testClient2: Hello world!\n"); + barrier3.await(); + } + }; + + Thread client2 = new Thread(new ChatConnection() { + @Override + public void run(Socket socket, BufferedReader reader, Writer writer) throws Exception { + readAvailableString(reader); + writer.write("testClient2\n"); + waitForJoin(reader, "testClient2"); + barrier1.await(); + writer.write("Hello world!\n"); + barrier2.await(); + String string = readAvailableString(reader); + assertEqual(exceptions, string, "testClient1: Good morning!\n"); + barrier3.await(); + } + }); + + client2.start(); + chatConnection.run(); + chatConnection2.run(); + if (!exceptions.isEmpty()) { + throw exceptions.get(0); + } + } + + private static void performTestUseName() throws Exception { + final CyclicBarrier barrier1 = new CyclicBarrier(2); + final CyclicBarrier barrier2 = new CyclicBarrier(2); + final CyclicBarrier barrier3 = new CyclicBarrier(2); + final List exceptions = new ArrayList(); + + ChatConnection chatConnection = new ChatConnection() { + @Override + public void run(Socket socket, BufferedReader reader, Writer writer) throws Exception { + String string = readAvailableString(reader); + if (!"Name: ".equals(string)) { + exceptions.add(new RuntimeException("Expected Name: ")); + } + writer.write("testClient1\n"); + waitForJoin(reader, "testClient1"); + barrier1.await(); + barrier2.await(); + string = readAvailableString(reader); + if (!"testClient2: Hello world!\n".equals(string)) { + exceptions.add(new RuntimeException("testClient2: Hello world!\n")); + } + barrier3.await(); + } + }; + + Thread client2 = new Thread(new ChatConnection() { + @Override + public void run(Socket socket, BufferedReader reader, Writer writer) throws Exception { + String string = readAvailableString(reader); + if (!"Name: ".equals(string)) { + exceptions.add(new RuntimeException("Expected Name: ")); + } + writer.write("testClient2\n"); + waitForJoin(reader, "testClient2"); + barrier1.await(); + writer.write("Hello world!\n"); + barrier2.await(); + barrier3.await(); + } + }); + + client2.start(); + chatConnection.run(); + if (!exceptions.isEmpty()) { + throw exceptions.get(0); + } + } + + private static String readAvailableString(Reader reader) throws IOException { + return readAvailableString(reader, false); + } + + private static String readAvailableString(Reader reader, boolean now) throws IOException { + StringBuilder builder = new StringBuilder(); + int bytes; + if (now && !reader.ready()) { + return null; + } + do { + char[] buf = new char[256]; + bytes = reader.read(buf); + builder.append(buf, 0, bytes); + } while (bytes == 256); + return builder.toString(); + } + + private abstract static class ChatConnection implements Runnable { + public Exception exception; + + @Override + public void run() { + try (Socket socket = new Socket("localhost", listeningPort); + BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); + Writer writer = new FlushingWriter(new OutputStreamWriter(socket.getOutputStream()))) { + socket.setTcpNoDelay(true); + + run(socket, reader, writer); + } catch (Exception e) { + exception = e; + } + } + + public abstract void run(Socket socket, BufferedReader reader, Writer writer) throws Exception; + } + + private static class FlushingWriter extends Writer { + public final Writer delegate; + + private FlushingWriter(Writer delegate) { + this.delegate = delegate; + } + + @Override + public void write(char[] cbuf, int off, int len) throws IOException { + delegate.write(cbuf, off, len); + } + + @Override + public void flush() throws IOException { + delegate.flush(); + } + + @Override + public void close() throws IOException { + delegate.close(); + } + + @Override + public void write(String str) throws IOException { + super.write(str); + flush(); + } + } +}