jdk/src/share/classes/sun/rmi/transport/tcp/ConnectionMultiplexer.java
changeset 2 90ce3da70b43
child 51 6fe31bc95bbc
equal deleted inserted replaced
0:fd16c54261b3 2:90ce3da70b43
       
     1 /*
       
     2  * Copyright 1996-2003 Sun Microsystems, Inc.  All Rights Reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Sun designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Sun in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
       
    22  * CA 95054 USA or visit www.sun.com if you need additional information or
       
    23  * have any questions.
       
    24  */
       
    25 package sun.rmi.transport.tcp;
       
    26 
       
    27 import java.io.*;
       
    28 import java.util.*;
       
    29 import java.rmi.server.LogStream;
       
    30 
       
    31 import sun.rmi.runtime.Log;
       
    32 
       
    33 /**
       
    34  * ConnectionMultiplexer manages the transparent multiplexing of
       
    35  * multiple virtual connections from one endpoint to another through
       
    36  * one given real connection to that endpoint.  The input and output
       
    37  * streams for the the underlying real connection must be supplied.
       
    38  * A callback object is also supplied to be informed of new virtual
       
    39  * connections opened by the remote endpoint.  After creation, the
       
    40  * run() method must be called in a thread created for demultiplexing
       
    41  * the connections.  The openConnection() method is called to
       
    42  * initiate a virtual connection from this endpoint.
       
    43  *
       
    44  * @author Peter Jones
       
    45  */
       
    46 final class ConnectionMultiplexer {
       
    47 
       
    48     /** "multiplex" log level */
       
    49     static int logLevel = LogStream.parseLevel(getLogLevel());
       
    50 
       
    51     private static String getLogLevel() {
       
    52         return (String) java.security.AccessController.doPrivileged(
       
    53             new sun.security.action.GetPropertyAction("sun.rmi.transport.tcp.multiplex.logLevel"));
       
    54     }
       
    55 
       
    56     /* multiplex system log */
       
    57     static final Log multiplexLog =
       
    58         Log.getLog("sun.rmi.transport.tcp.multiplex",
       
    59                    "multiplex", ConnectionMultiplexer.logLevel);
       
    60 
       
    61     /** multiplexing protocol operation codes */
       
    62     private final static int OPEN     = 0xE1;
       
    63     private final static int CLOSE    = 0xE2;
       
    64     private final static int CLOSEACK = 0xE3;
       
    65     private final static int REQUEST  = 0xE4;
       
    66     private final static int TRANSMIT = 0xE5;
       
    67 
       
    68     /** object to notify for new connections from remote endpoint */
       
    69     private TCPChannel channel;
       
    70 
       
    71     /** input stream for underlying single connection */
       
    72     private InputStream in;
       
    73 
       
    74     /** output stream for underlying single connection */
       
    75     private OutputStream out;
       
    76 
       
    77     /** true if underlying connection originated from this endpoint
       
    78         (used for generating unique connection IDs) */
       
    79     private boolean orig;
       
    80 
       
    81     /** layered stream for reading formatted data from underlying connection */
       
    82     private DataInputStream dataIn;
       
    83 
       
    84     /** layered stream for writing formatted data to underlying connection */
       
    85     private DataOutputStream dataOut;
       
    86 
       
    87     /** table holding currently open connection IDs and related info */
       
    88     private Hashtable connectionTable = new Hashtable(7);
       
    89 
       
    90     /** number of currently open connections */
       
    91     private int numConnections = 0;
       
    92 
       
    93     /** maximum allowed open connections */
       
    94     private final static int maxConnections = 256;
       
    95 
       
    96     /** ID of last connection opened */
       
    97     private int lastID = 0x1001;
       
    98 
       
    99     /** true if this mechanism is still alive */
       
   100     private boolean alive = true;
       
   101 
       
   102     /**
       
   103      * Create a new ConnectionMultiplexer using the given underlying
       
   104      * input/output stream pair.  The run method must be called
       
   105      * (possibly on a new thread) to handle the demultiplexing.
       
   106      * @param channel object to notify when new connection is received
       
   107      * @param in input stream of underlying connection
       
   108      * @param out output stream of underlying connection
       
   109      * @param orig true if this endpoint intiated the underlying
       
   110      *        connection (needs to be set differently at both ends)
       
   111      */
       
   112     public ConnectionMultiplexer(
       
   113         TCPChannel    channel,
       
   114         InputStream   in,
       
   115         OutputStream  out,
       
   116         boolean       orig)
       
   117     {
       
   118         this.channel = channel;
       
   119         this.in      = in;
       
   120         this.out     = out;
       
   121         this.orig    = orig;
       
   122 
       
   123         dataIn = new DataInputStream(in);
       
   124         dataOut = new DataOutputStream(out);
       
   125     }
       
   126 
       
   127     /**
       
   128      * Process multiplexing protocol received from underlying connection.
       
   129      */
       
   130     public void run() throws IOException
       
   131     {
       
   132         try {
       
   133             int op, id, length;
       
   134             Integer idObj;
       
   135             MultiplexConnectionInfo info;
       
   136 
       
   137             while (true) {
       
   138 
       
   139                 // read next op code from remote endpoint
       
   140                 op = dataIn.readUnsignedByte();
       
   141                 switch (op) {
       
   142 
       
   143                 // remote endpoint initiating new connection
       
   144                 case OPEN:
       
   145                     id = dataIn.readUnsignedShort();
       
   146 
       
   147                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
       
   148                         multiplexLog.log(Log.VERBOSE, "operation  OPEN " + id);
       
   149                     }
       
   150 
       
   151                     idObj = new Integer(id);
       
   152                     info =
       
   153                         (MultiplexConnectionInfo) connectionTable.get(idObj);
       
   154                     if (info != null)
       
   155                         throw new IOException(
       
   156                             "OPEN: Connection ID already exists");
       
   157                     info = new MultiplexConnectionInfo(id);
       
   158                     info.in = new MultiplexInputStream(this, info, 2048);
       
   159                     info.out = new MultiplexOutputStream(this, info, 2048);
       
   160                     synchronized (connectionTable) {
       
   161                         connectionTable.put(idObj, info);
       
   162                         ++ numConnections;
       
   163                     }
       
   164                     sun.rmi.transport.Connection conn;
       
   165                     conn = new TCPConnection(channel, info.in, info.out);
       
   166                     channel.acceptMultiplexConnection(conn);
       
   167                     break;
       
   168 
       
   169                 // remote endpoint closing connection
       
   170                 case CLOSE:
       
   171                     id = dataIn.readUnsignedShort();
       
   172 
       
   173                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
       
   174                         multiplexLog.log(Log.VERBOSE, "operation  CLOSE " + id);
       
   175                     }
       
   176 
       
   177                     idObj = new Integer(id);
       
   178                     info =
       
   179                         (MultiplexConnectionInfo) connectionTable.get(idObj);
       
   180                     if (info == null)
       
   181                         throw new IOException(
       
   182                             "CLOSE: Invalid connection ID");
       
   183                     info.in.disconnect();
       
   184                     info.out.disconnect();
       
   185                     if (!info.closed)
       
   186                         sendCloseAck(info);
       
   187                     synchronized (connectionTable) {
       
   188                         connectionTable.remove(idObj);
       
   189                         -- numConnections;
       
   190                     }
       
   191                     break;
       
   192 
       
   193                 // remote endpoint acknowledging close of connection
       
   194                 case CLOSEACK:
       
   195                     id = dataIn.readUnsignedShort();
       
   196 
       
   197                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
       
   198                         multiplexLog.log(Log.VERBOSE,
       
   199                             "operation  CLOSEACK " + id);
       
   200                     }
       
   201 
       
   202                     idObj = new Integer(id);
       
   203                     info =
       
   204                         (MultiplexConnectionInfo) connectionTable.get(idObj);
       
   205                     if (info == null)
       
   206                         throw new IOException(
       
   207                             "CLOSEACK: Invalid connection ID");
       
   208                     if (!info.closed)
       
   209                         throw new IOException(
       
   210                             "CLOSEACK: Connection not closed");
       
   211                     info.in.disconnect();
       
   212                     info.out.disconnect();
       
   213                     synchronized (connectionTable) {
       
   214                         connectionTable.remove(idObj);
       
   215                         -- numConnections;
       
   216                     }
       
   217                     break;
       
   218 
       
   219                 // remote endpoint declaring additional bytes receivable
       
   220                 case REQUEST:
       
   221                     id = dataIn.readUnsignedShort();
       
   222                     idObj = new Integer(id);
       
   223                     info =
       
   224                         (MultiplexConnectionInfo) connectionTable.get(idObj);
       
   225                     if (info == null)
       
   226                         throw new IOException(
       
   227                             "REQUEST: Invalid connection ID");
       
   228                     length = dataIn.readInt();
       
   229 
       
   230                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
       
   231                         multiplexLog.log(Log.VERBOSE,
       
   232                             "operation  REQUEST " + id + ": " + length);
       
   233                     }
       
   234 
       
   235                     info.out.request(length);
       
   236                     break;
       
   237 
       
   238                 // remote endpoint transmitting data packet
       
   239                 case TRANSMIT:
       
   240                     id = dataIn.readUnsignedShort();
       
   241                     idObj = new Integer(id);
       
   242                     info =
       
   243                         (MultiplexConnectionInfo) connectionTable.get(idObj);
       
   244                     if (info == null)
       
   245                         throw new IOException("SEND: Invalid connection ID");
       
   246                     length = dataIn.readInt();
       
   247 
       
   248                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
       
   249                         multiplexLog.log(Log.VERBOSE,
       
   250                             "operation  TRANSMIT " + id + ": " + length);
       
   251                     }
       
   252 
       
   253                     info.in.receive(length, dataIn);
       
   254                     break;
       
   255 
       
   256                 default:
       
   257                     throw new IOException("Invalid operation: " +
       
   258                                           Integer.toHexString(op));
       
   259                 }
       
   260             }
       
   261         } finally {
       
   262             shutDown();
       
   263         }
       
   264     }
       
   265 
       
   266     /**
       
   267      * Initiate a new multiplexed connection through the underlying
       
   268      * connection.
       
   269      */
       
   270     public synchronized TCPConnection openConnection() throws IOException
       
   271     {
       
   272         // generate ID that should not be already used
       
   273         // If all possible 32768 IDs are used,
       
   274         // this method will block searching for a new ID forever.
       
   275         int id;
       
   276         Integer idObj;
       
   277         do {
       
   278             lastID = (++ lastID) & 0x7FFF;
       
   279             id = lastID;
       
   280 
       
   281             // The orig flag (copied to the high bit of the ID) is used
       
   282             // to have two distinct ranges to choose IDs from for the
       
   283             // two endpoints.
       
   284             if (orig)
       
   285                 id |= 0x8000;
       
   286             idObj = new Integer(id);
       
   287         } while (connectionTable.get(idObj) != null);
       
   288 
       
   289         // create multiplexing streams and bookkeeping information
       
   290         MultiplexConnectionInfo info = new MultiplexConnectionInfo(id);
       
   291         info.in = new MultiplexInputStream(this, info, 2048);
       
   292         info.out = new MultiplexOutputStream(this, info, 2048);
       
   293 
       
   294         // add to connection table if multiplexer has not died
       
   295         synchronized (connectionTable) {
       
   296             if (!alive)
       
   297                 throw new IOException("Multiplexer connection dead");
       
   298             if (numConnections >= maxConnections)
       
   299                 throw new IOException("Cannot exceed " + maxConnections +
       
   300                     " simultaneous multiplexed connections");
       
   301             connectionTable.put(idObj, info);
       
   302             ++ numConnections;
       
   303         }
       
   304 
       
   305         // inform remote endpoint of new connection
       
   306         synchronized (dataOut) {
       
   307             try {
       
   308                 dataOut.writeByte(OPEN);
       
   309                 dataOut.writeShort(id);
       
   310                 dataOut.flush();
       
   311             } catch (IOException e) {
       
   312                 multiplexLog.log(Log.BRIEF, "exception: ", e);
       
   313 
       
   314                 shutDown();
       
   315                 throw e;
       
   316             }
       
   317         }
       
   318 
       
   319         return new TCPConnection(channel, info.in, info.out);
       
   320     }
       
   321 
       
   322     /**
       
   323      * Shut down all connections and clean up.
       
   324      */
       
   325     public void shutDown()
       
   326     {
       
   327         // inform all associated streams
       
   328         synchronized (connectionTable) {
       
   329             // return if multiplexer already officially dead
       
   330             if (!alive)
       
   331                 return;
       
   332             alive = false;
       
   333 
       
   334             Enumeration enum_ = connectionTable.elements();
       
   335             while (enum_.hasMoreElements()) {
       
   336                 MultiplexConnectionInfo info =
       
   337                     (MultiplexConnectionInfo) enum_.nextElement();
       
   338                 info.in.disconnect();
       
   339                 info.out.disconnect();
       
   340             }
       
   341             connectionTable.clear();
       
   342             numConnections = 0;
       
   343         }
       
   344 
       
   345         // close underlying connection, if possible (and not already done)
       
   346         try {
       
   347             in.close();
       
   348         } catch (IOException e) {
       
   349         }
       
   350         try {
       
   351             out.close();
       
   352         } catch (IOException e) {
       
   353         }
       
   354     }
       
   355 
       
   356     /**
       
   357      * Send request for more data on connection to remote endpoint.
       
   358      * @param info connection information structure
       
   359      * @param len number of more bytes that can be received
       
   360      */
       
   361     void sendRequest(MultiplexConnectionInfo info, int len) throws IOException
       
   362     {
       
   363         synchronized (dataOut) {
       
   364             if (alive && !info.closed)
       
   365                 try {
       
   366                     dataOut.writeByte(REQUEST);
       
   367                     dataOut.writeShort(info.id);
       
   368                     dataOut.writeInt(len);
       
   369                     dataOut.flush();
       
   370                 } catch (IOException e) {
       
   371                     multiplexLog.log(Log.BRIEF, "exception: ", e);
       
   372 
       
   373                     shutDown();
       
   374                     throw e;
       
   375                 }
       
   376         }
       
   377     }
       
   378 
       
   379     /**
       
   380      * Send packet of requested data on connection to remote endpoint.
       
   381      * @param info connection information structure
       
   382      * @param buf array containg bytes to send
       
   383      * @param off offset of first array index of packet
       
   384      * @param len number of bytes in packet to send
       
   385      */
       
   386     void sendTransmit(MultiplexConnectionInfo info,
       
   387                       byte buf[], int off, int len) throws IOException
       
   388     {
       
   389         synchronized (dataOut) {
       
   390             if (alive && !info.closed)
       
   391                 try {
       
   392                     dataOut.writeByte(TRANSMIT);
       
   393                     dataOut.writeShort(info.id);
       
   394                     dataOut.writeInt(len);
       
   395                     dataOut.write(buf, off, len);
       
   396                     dataOut.flush();
       
   397                 } catch (IOException e) {
       
   398                     multiplexLog.log(Log.BRIEF, "exception: ", e);
       
   399 
       
   400                     shutDown();
       
   401                     throw e;
       
   402                 }
       
   403         }
       
   404     }
       
   405 
       
   406     /**
       
   407      * Inform remote endpoint that connection has been closed.
       
   408      * @param info connection information structure
       
   409      */
       
   410     void sendClose(MultiplexConnectionInfo info) throws IOException
       
   411     {
       
   412         info.out.disconnect();
       
   413         synchronized (dataOut) {
       
   414             if (alive && !info.closed)
       
   415                 try {
       
   416                     dataOut.writeByte(CLOSE);
       
   417                     dataOut.writeShort(info.id);
       
   418                     dataOut.flush();
       
   419                     info.closed = true;
       
   420                 } catch (IOException e) {
       
   421                     multiplexLog.log(Log.BRIEF, "exception: ", e);
       
   422 
       
   423                     shutDown();
       
   424                     throw e;
       
   425                 }
       
   426         }
       
   427     }
       
   428 
       
   429     /**
       
   430      * Acknowledge remote endpoint's closing of connection.
       
   431      * @param info connection information structure
       
   432      */
       
   433     void sendCloseAck(MultiplexConnectionInfo info) throws IOException
       
   434     {
       
   435         synchronized (dataOut) {
       
   436             if (alive && !info.closed)
       
   437                 try {
       
   438                     dataOut.writeByte(CLOSEACK);
       
   439                     dataOut.writeShort(info.id);
       
   440                     dataOut.flush();
       
   441                     info.closed = true;
       
   442                 } catch (IOException e) {
       
   443                     multiplexLog.log(Log.BRIEF, "exception: ", e);
       
   444 
       
   445                     shutDown();
       
   446                     throw e;
       
   447                 }
       
   448         }
       
   449     }
       
   450 
       
   451     /**
       
   452      * Shut down connection upon finalization.
       
   453      */
       
   454     protected void finalize() throws Throwable
       
   455     {
       
   456         super.finalize();
       
   457         shutDown();
       
   458     }
       
   459 }