jdk/src/share/classes/sun/rmi/transport/tcp/ConnectionMultiplexer.java
changeset 2 90ce3da70b43
child 51 6fe31bc95bbc
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/share/classes/sun/rmi/transport/tcp/ConnectionMultiplexer.java	Sat Dec 01 00:00:00 2007 +0000
@@ -0,0 +1,459 @@
+/*
+ * Copyright 1996-2003 Sun Microsystems, Inc.  All Rights Reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Sun designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Sun in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ */
+package sun.rmi.transport.tcp;
+
+import java.io.*;
+import java.util.*;
+import java.rmi.server.LogStream;
+
+import sun.rmi.runtime.Log;
+
+/**
+ * ConnectionMultiplexer manages the transparent multiplexing of
+ * multiple virtual connections from one endpoint to another through
+ * one given real connection to that endpoint.  The input and output
+ * streams for the the underlying real connection must be supplied.
+ * A callback object is also supplied to be informed of new virtual
+ * connections opened by the remote endpoint.  After creation, the
+ * run() method must be called in a thread created for demultiplexing
+ * the connections.  The openConnection() method is called to
+ * initiate a virtual connection from this endpoint.
+ *
+ * @author Peter Jones
+ */
+final class ConnectionMultiplexer {
+
+    /** "multiplex" log level */
+    static int logLevel = LogStream.parseLevel(getLogLevel());
+
+    private static String getLogLevel() {
+        return (String) java.security.AccessController.doPrivileged(
+            new sun.security.action.GetPropertyAction("sun.rmi.transport.tcp.multiplex.logLevel"));
+    }
+
+    /* multiplex system log */
+    static final Log multiplexLog =
+        Log.getLog("sun.rmi.transport.tcp.multiplex",
+                   "multiplex", ConnectionMultiplexer.logLevel);
+
+    /** multiplexing protocol operation codes */
+    private final static int OPEN     = 0xE1;
+    private final static int CLOSE    = 0xE2;
+    private final static int CLOSEACK = 0xE3;
+    private final static int REQUEST  = 0xE4;
+    private final static int TRANSMIT = 0xE5;
+
+    /** object to notify for new connections from remote endpoint */
+    private TCPChannel channel;
+
+    /** input stream for underlying single connection */
+    private InputStream in;
+
+    /** output stream for underlying single connection */
+    private OutputStream out;
+
+    /** true if underlying connection originated from this endpoint
+        (used for generating unique connection IDs) */
+    private boolean orig;
+
+    /** layered stream for reading formatted data from underlying connection */
+    private DataInputStream dataIn;
+
+    /** layered stream for writing formatted data to underlying connection */
+    private DataOutputStream dataOut;
+
+    /** table holding currently open connection IDs and related info */
+    private Hashtable connectionTable = new Hashtable(7);
+
+    /** number of currently open connections */
+    private int numConnections = 0;
+
+    /** maximum allowed open connections */
+    private final static int maxConnections = 256;
+
+    /** ID of last connection opened */
+    private int lastID = 0x1001;
+
+    /** true if this mechanism is still alive */
+    private boolean alive = true;
+
+    /**
+     * Create a new ConnectionMultiplexer using the given underlying
+     * input/output stream pair.  The run method must be called
+     * (possibly on a new thread) to handle the demultiplexing.
+     * @param channel object to notify when new connection is received
+     * @param in input stream of underlying connection
+     * @param out output stream of underlying connection
+     * @param orig true if this endpoint intiated the underlying
+     *        connection (needs to be set differently at both ends)
+     */
+    public ConnectionMultiplexer(
+        TCPChannel    channel,
+        InputStream   in,
+        OutputStream  out,
+        boolean       orig)
+    {
+        this.channel = channel;
+        this.in      = in;
+        this.out     = out;
+        this.orig    = orig;
+
+        dataIn = new DataInputStream(in);
+        dataOut = new DataOutputStream(out);
+    }
+
+    /**
+     * Process multiplexing protocol received from underlying connection.
+     */
+    public void run() throws IOException
+    {
+        try {
+            int op, id, length;
+            Integer idObj;
+            MultiplexConnectionInfo info;
+
+            while (true) {
+
+                // read next op code from remote endpoint
+                op = dataIn.readUnsignedByte();
+                switch (op) {
+
+                // remote endpoint initiating new connection
+                case OPEN:
+                    id = dataIn.readUnsignedShort();
+
+                    if (multiplexLog.isLoggable(Log.VERBOSE)) {
+                        multiplexLog.log(Log.VERBOSE, "operation  OPEN " + id);
+                    }
+
+                    idObj = new Integer(id);
+                    info =
+                        (MultiplexConnectionInfo) connectionTable.get(idObj);
+                    if (info != null)
+                        throw new IOException(
+                            "OPEN: Connection ID already exists");
+                    info = new MultiplexConnectionInfo(id);
+                    info.in = new MultiplexInputStream(this, info, 2048);
+                    info.out = new MultiplexOutputStream(this, info, 2048);
+                    synchronized (connectionTable) {
+                        connectionTable.put(idObj, info);
+                        ++ numConnections;
+                    }
+                    sun.rmi.transport.Connection conn;
+                    conn = new TCPConnection(channel, info.in, info.out);
+                    channel.acceptMultiplexConnection(conn);
+                    break;
+
+                // remote endpoint closing connection
+                case CLOSE:
+                    id = dataIn.readUnsignedShort();
+
+                    if (multiplexLog.isLoggable(Log.VERBOSE)) {
+                        multiplexLog.log(Log.VERBOSE, "operation  CLOSE " + id);
+                    }
+
+                    idObj = new Integer(id);
+                    info =
+                        (MultiplexConnectionInfo) connectionTable.get(idObj);
+                    if (info == null)
+                        throw new IOException(
+                            "CLOSE: Invalid connection ID");
+                    info.in.disconnect();
+                    info.out.disconnect();
+                    if (!info.closed)
+                        sendCloseAck(info);
+                    synchronized (connectionTable) {
+                        connectionTable.remove(idObj);
+                        -- numConnections;
+                    }
+                    break;
+
+                // remote endpoint acknowledging close of connection
+                case CLOSEACK:
+                    id = dataIn.readUnsignedShort();
+
+                    if (multiplexLog.isLoggable(Log.VERBOSE)) {
+                        multiplexLog.log(Log.VERBOSE,
+                            "operation  CLOSEACK " + id);
+                    }
+
+                    idObj = new Integer(id);
+                    info =
+                        (MultiplexConnectionInfo) connectionTable.get(idObj);
+                    if (info == null)
+                        throw new IOException(
+                            "CLOSEACK: Invalid connection ID");
+                    if (!info.closed)
+                        throw new IOException(
+                            "CLOSEACK: Connection not closed");
+                    info.in.disconnect();
+                    info.out.disconnect();
+                    synchronized (connectionTable) {
+                        connectionTable.remove(idObj);
+                        -- numConnections;
+                    }
+                    break;
+
+                // remote endpoint declaring additional bytes receivable
+                case REQUEST:
+                    id = dataIn.readUnsignedShort();
+                    idObj = new Integer(id);
+                    info =
+                        (MultiplexConnectionInfo) connectionTable.get(idObj);
+                    if (info == null)
+                        throw new IOException(
+                            "REQUEST: Invalid connection ID");
+                    length = dataIn.readInt();
+
+                    if (multiplexLog.isLoggable(Log.VERBOSE)) {
+                        multiplexLog.log(Log.VERBOSE,
+                            "operation  REQUEST " + id + ": " + length);
+                    }
+
+                    info.out.request(length);
+                    break;
+
+                // remote endpoint transmitting data packet
+                case TRANSMIT:
+                    id = dataIn.readUnsignedShort();
+                    idObj = new Integer(id);
+                    info =
+                        (MultiplexConnectionInfo) connectionTable.get(idObj);
+                    if (info == null)
+                        throw new IOException("SEND: Invalid connection ID");
+                    length = dataIn.readInt();
+
+                    if (multiplexLog.isLoggable(Log.VERBOSE)) {
+                        multiplexLog.log(Log.VERBOSE,
+                            "operation  TRANSMIT " + id + ": " + length);
+                    }
+
+                    info.in.receive(length, dataIn);
+                    break;
+
+                default:
+                    throw new IOException("Invalid operation: " +
+                                          Integer.toHexString(op));
+                }
+            }
+        } finally {
+            shutDown();
+        }
+    }
+
+    /**
+     * Initiate a new multiplexed connection through the underlying
+     * connection.
+     */
+    public synchronized TCPConnection openConnection() throws IOException
+    {
+        // generate ID that should not be already used
+        // If all possible 32768 IDs are used,
+        // this method will block searching for a new ID forever.
+        int id;
+        Integer idObj;
+        do {
+            lastID = (++ lastID) & 0x7FFF;
+            id = lastID;
+
+            // The orig flag (copied to the high bit of the ID) is used
+            // to have two distinct ranges to choose IDs from for the
+            // two endpoints.
+            if (orig)
+                id |= 0x8000;
+            idObj = new Integer(id);
+        } while (connectionTable.get(idObj) != null);
+
+        // create multiplexing streams and bookkeeping information
+        MultiplexConnectionInfo info = new MultiplexConnectionInfo(id);
+        info.in = new MultiplexInputStream(this, info, 2048);
+        info.out = new MultiplexOutputStream(this, info, 2048);
+
+        // add to connection table if multiplexer has not died
+        synchronized (connectionTable) {
+            if (!alive)
+                throw new IOException("Multiplexer connection dead");
+            if (numConnections >= maxConnections)
+                throw new IOException("Cannot exceed " + maxConnections +
+                    " simultaneous multiplexed connections");
+            connectionTable.put(idObj, info);
+            ++ numConnections;
+        }
+
+        // inform remote endpoint of new connection
+        synchronized (dataOut) {
+            try {
+                dataOut.writeByte(OPEN);
+                dataOut.writeShort(id);
+                dataOut.flush();
+            } catch (IOException e) {
+                multiplexLog.log(Log.BRIEF, "exception: ", e);
+
+                shutDown();
+                throw e;
+            }
+        }
+
+        return new TCPConnection(channel, info.in, info.out);
+    }
+
+    /**
+     * Shut down all connections and clean up.
+     */
+    public void shutDown()
+    {
+        // inform all associated streams
+        synchronized (connectionTable) {
+            // return if multiplexer already officially dead
+            if (!alive)
+                return;
+            alive = false;
+
+            Enumeration enum_ = connectionTable.elements();
+            while (enum_.hasMoreElements()) {
+                MultiplexConnectionInfo info =
+                    (MultiplexConnectionInfo) enum_.nextElement();
+                info.in.disconnect();
+                info.out.disconnect();
+            }
+            connectionTable.clear();
+            numConnections = 0;
+        }
+
+        // close underlying connection, if possible (and not already done)
+        try {
+            in.close();
+        } catch (IOException e) {
+        }
+        try {
+            out.close();
+        } catch (IOException e) {
+        }
+    }
+
+    /**
+     * Send request for more data on connection to remote endpoint.
+     * @param info connection information structure
+     * @param len number of more bytes that can be received
+     */
+    void sendRequest(MultiplexConnectionInfo info, int len) throws IOException
+    {
+        synchronized (dataOut) {
+            if (alive && !info.closed)
+                try {
+                    dataOut.writeByte(REQUEST);
+                    dataOut.writeShort(info.id);
+                    dataOut.writeInt(len);
+                    dataOut.flush();
+                } catch (IOException e) {
+                    multiplexLog.log(Log.BRIEF, "exception: ", e);
+
+                    shutDown();
+                    throw e;
+                }
+        }
+    }
+
+    /**
+     * Send packet of requested data on connection to remote endpoint.
+     * @param info connection information structure
+     * @param buf array containg bytes to send
+     * @param off offset of first array index of packet
+     * @param len number of bytes in packet to send
+     */
+    void sendTransmit(MultiplexConnectionInfo info,
+                      byte buf[], int off, int len) throws IOException
+    {
+        synchronized (dataOut) {
+            if (alive && !info.closed)
+                try {
+                    dataOut.writeByte(TRANSMIT);
+                    dataOut.writeShort(info.id);
+                    dataOut.writeInt(len);
+                    dataOut.write(buf, off, len);
+                    dataOut.flush();
+                } catch (IOException e) {
+                    multiplexLog.log(Log.BRIEF, "exception: ", e);
+
+                    shutDown();
+                    throw e;
+                }
+        }
+    }
+
+    /**
+     * Inform remote endpoint that connection has been closed.
+     * @param info connection information structure
+     */
+    void sendClose(MultiplexConnectionInfo info) throws IOException
+    {
+        info.out.disconnect();
+        synchronized (dataOut) {
+            if (alive && !info.closed)
+                try {
+                    dataOut.writeByte(CLOSE);
+                    dataOut.writeShort(info.id);
+                    dataOut.flush();
+                    info.closed = true;
+                } catch (IOException e) {
+                    multiplexLog.log(Log.BRIEF, "exception: ", e);
+
+                    shutDown();
+                    throw e;
+                }
+        }
+    }
+
+    /**
+     * Acknowledge remote endpoint's closing of connection.
+     * @param info connection information structure
+     */
+    void sendCloseAck(MultiplexConnectionInfo info) throws IOException
+    {
+        synchronized (dataOut) {
+            if (alive && !info.closed)
+                try {
+                    dataOut.writeByte(CLOSEACK);
+                    dataOut.writeShort(info.id);
+                    dataOut.flush();
+                    info.closed = true;
+                } catch (IOException e) {
+                    multiplexLog.log(Log.BRIEF, "exception: ", e);
+
+                    shutDown();
+                    throw e;
+                }
+        }
+    }
+
+    /**
+     * Shut down connection upon finalization.
+     */
+    protected void finalize() throws Throwable
+    {
+        super.finalize();
+        shutDown();
+    }
+}