jdk/src/java.rmi/share/classes/sun/rmi/transport/tcp/ConnectionMultiplexer.java
changeset 47199 5b4ba31ce49b
parent 47198 898607275d6e
parent 47125 46ab150d59cd
child 47200 618e6ae80417
equal deleted inserted replaced
47198:898607275d6e 47199:5b4ba31ce49b
     1 /*
       
     2  * Copyright (c) 1996, 2017, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 package sun.rmi.transport.tcp;
       
    26 
       
    27 import java.io.*;
       
    28 import java.util.*;
       
    29 import java.rmi.server.LogStream;
       
    30 import java.security.PrivilegedAction;
       
    31 
       
    32 import sun.rmi.runtime.Log;
       
    33 
       
    34 /**
       
    35  * ConnectionMultiplexer manages the transparent multiplexing of
       
    36  * multiple virtual connections from one endpoint to another through
       
    37  * one given real connection to that endpoint.  The input and output
       
    38  * streams for the underlying real connection must be supplied.
       
    39  * A callback object is also supplied to be informed of new virtual
       
    40  * connections opened by the remote endpoint.  After creation, the
       
    41  * run() method must be called in a thread created for demultiplexing
       
    42  * the connections.  The openConnection() method is called to
       
    43  * initiate a virtual connection from this endpoint.
       
    44  *
       
    45  * @author Peter Jones
       
    46  */
       
    47 @SuppressWarnings("deprecation")
       
    48 final class ConnectionMultiplexer {
       
    49 
       
    50     /** "multiplex" log level */
       
    51     static int logLevel = LogStream.parseLevel(getLogLevel());
       
    52 
       
    53     private static String getLogLevel() {
       
    54         return java.security.AccessController.doPrivileged(
       
    55            (PrivilegedAction<String>) () -> System.getProperty("sun.rmi.transport.tcp.multiplex.logLevel"));
       
    56     }
       
    57 
       
    58     /* multiplex system log */
       
    59     static final Log multiplexLog =
       
    60         Log.getLog("sun.rmi.transport.tcp.multiplex",
       
    61                    "multiplex", ConnectionMultiplexer.logLevel);
       
    62 
       
    63     /** multiplexing protocol operation codes */
       
    64     private final static int OPEN     = 0xE1;
       
    65     private final static int CLOSE    = 0xE2;
       
    66     private final static int CLOSEACK = 0xE3;
       
    67     private final static int REQUEST  = 0xE4;
       
    68     private final static int TRANSMIT = 0xE5;
       
    69 
       
    70     /** object to notify for new connections from remote endpoint */
       
    71     private TCPChannel channel;
       
    72 
       
    73     /** input stream for underlying single connection */
       
    74     private InputStream in;
       
    75 
       
    76     /** output stream for underlying single connection */
       
    77     private OutputStream out;
       
    78 
       
    79     /** true if underlying connection originated from this endpoint
       
    80         (used for generating unique connection IDs) */
       
    81     private boolean orig;
       
    82 
       
    83     /** layered stream for reading formatted data from underlying connection */
       
    84     private DataInputStream dataIn;
       
    85 
       
    86     /** layered stream for writing formatted data to underlying connection */
       
    87     private DataOutputStream dataOut;
       
    88 
       
    89     /** table holding currently open connection IDs and related info */
       
    90     private Hashtable<Integer, MultiplexConnectionInfo> connectionTable = new Hashtable<>(7);
       
    91 
       
    92     /** number of currently open connections */
       
    93     private int numConnections = 0;
       
    94 
       
    95     /** maximum allowed open connections */
       
    96     private final static int maxConnections = 256;
       
    97 
       
    98     /** ID of last connection opened */
       
    99     private int lastID = 0x1001;
       
   100 
       
   101     /** true if this mechanism is still alive */
       
   102     private boolean alive = true;
       
   103 
       
   104     /**
       
   105      * Create a new ConnectionMultiplexer using the given underlying
       
   106      * input/output stream pair.  The run method must be called
       
   107      * (possibly on a new thread) to handle the demultiplexing.
       
   108      * @param channel object to notify when new connection is received
       
   109      * @param in input stream of underlying connection
       
   110      * @param out output stream of underlying connection
       
   111      * @param orig true if this endpoint intiated the underlying
       
   112      *        connection (needs to be set differently at both ends)
       
   113      */
       
   114     public ConnectionMultiplexer(
       
   115         TCPChannel    channel,
       
   116         InputStream   in,
       
   117         OutputStream  out,
       
   118         boolean       orig)
       
   119     {
       
   120         this.channel = channel;
       
   121         this.in      = in;
       
   122         this.out     = out;
       
   123         this.orig    = orig;
       
   124 
       
   125         dataIn = new DataInputStream(in);
       
   126         dataOut = new DataOutputStream(out);
       
   127     }
       
   128 
       
   129     /**
       
   130      * Process multiplexing protocol received from underlying connection.
       
   131      */
       
   132     public void run() throws IOException
       
   133     {
       
   134         try {
       
   135             int op, id, length;
       
   136             MultiplexConnectionInfo info;
       
   137 
       
   138             while (true) {
       
   139 
       
   140                 // read next op code from remote endpoint
       
   141                 op = dataIn.readUnsignedByte();
       
   142                 switch (op) {
       
   143 
       
   144                 // remote endpoint initiating new connection
       
   145                 case OPEN:
       
   146                     id = dataIn.readUnsignedShort();
       
   147 
       
   148                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
       
   149                         multiplexLog.log(Log.VERBOSE, "operation  OPEN " + id);
       
   150                     }
       
   151 
       
   152                     info = connectionTable.get(id);
       
   153                     if (info != null)
       
   154                         throw new IOException(
       
   155                             "OPEN: Connection ID already exists");
       
   156                     info = new MultiplexConnectionInfo(id);
       
   157                     info.in = new MultiplexInputStream(this, info, 2048);
       
   158                     info.out = new MultiplexOutputStream(this, info, 2048);
       
   159                     synchronized (connectionTable) {
       
   160                         connectionTable.put(id, info);
       
   161                         ++ numConnections;
       
   162                     }
       
   163                     sun.rmi.transport.Connection conn;
       
   164                     conn = new TCPConnection(channel, info.in, info.out);
       
   165                     channel.acceptMultiplexConnection(conn);
       
   166                     break;
       
   167 
       
   168                 // remote endpoint closing connection
       
   169                 case CLOSE:
       
   170                     id = dataIn.readUnsignedShort();
       
   171 
       
   172                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
       
   173                         multiplexLog.log(Log.VERBOSE, "operation  CLOSE " + id);
       
   174                     }
       
   175 
       
   176                     info = connectionTable.get(id);
       
   177                     if (info == null)
       
   178                         throw new IOException(
       
   179                             "CLOSE: Invalid connection ID");
       
   180                     info.in.disconnect();
       
   181                     info.out.disconnect();
       
   182                     if (!info.closed)
       
   183                         sendCloseAck(info);
       
   184                     synchronized (connectionTable) {
       
   185                         connectionTable.remove(id);
       
   186                         -- numConnections;
       
   187                     }
       
   188                     break;
       
   189 
       
   190                 // remote endpoint acknowledging close of connection
       
   191                 case CLOSEACK:
       
   192                     id = dataIn.readUnsignedShort();
       
   193 
       
   194                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
       
   195                         multiplexLog.log(Log.VERBOSE,
       
   196                             "operation  CLOSEACK " + id);
       
   197                     }
       
   198 
       
   199                     info = connectionTable.get(id);
       
   200                     if (info == null)
       
   201                         throw new IOException(
       
   202                             "CLOSEACK: Invalid connection ID");
       
   203                     if (!info.closed)
       
   204                         throw new IOException(
       
   205                             "CLOSEACK: Connection not closed");
       
   206                     info.in.disconnect();
       
   207                     info.out.disconnect();
       
   208                     synchronized (connectionTable) {
       
   209                         connectionTable.remove(id);
       
   210                         -- numConnections;
       
   211                     }
       
   212                     break;
       
   213 
       
   214                 // remote endpoint declaring additional bytes receivable
       
   215                 case REQUEST:
       
   216                     id = dataIn.readUnsignedShort();
       
   217                     info = connectionTable.get(id);
       
   218                     if (info == null)
       
   219                         throw new IOException(
       
   220                             "REQUEST: Invalid connection ID");
       
   221                     length = dataIn.readInt();
       
   222 
       
   223                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
       
   224                         multiplexLog.log(Log.VERBOSE,
       
   225                             "operation  REQUEST " + id + ": " + length);
       
   226                     }
       
   227 
       
   228                     info.out.request(length);
       
   229                     break;
       
   230 
       
   231                 // remote endpoint transmitting data packet
       
   232                 case TRANSMIT:
       
   233                     id = dataIn.readUnsignedShort();
       
   234                     info = connectionTable.get(id);
       
   235                     if (info == null)
       
   236                         throw new IOException("SEND: Invalid connection ID");
       
   237                     length = dataIn.readInt();
       
   238 
       
   239                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
       
   240                         multiplexLog.log(Log.VERBOSE,
       
   241                             "operation  TRANSMIT " + id + ": " + length);
       
   242                     }
       
   243 
       
   244                     info.in.receive(length, dataIn);
       
   245                     break;
       
   246 
       
   247                 default:
       
   248                     throw new IOException("Invalid operation: " +
       
   249                                           Integer.toHexString(op));
       
   250                 }
       
   251             }
       
   252         } finally {
       
   253             shutDown();
       
   254         }
       
   255     }
       
   256 
       
   257     /**
       
   258      * Initiate a new multiplexed connection through the underlying
       
   259      * connection.
       
   260      */
       
   261     public synchronized TCPConnection openConnection() throws IOException
       
   262     {
       
   263         // generate ID that should not be already used
       
   264         // If all possible 32768 IDs are used,
       
   265         // this method will block searching for a new ID forever.
       
   266         int id;
       
   267         do {
       
   268             lastID = (++ lastID) & 0x7FFF;
       
   269             id = lastID;
       
   270 
       
   271             // The orig flag (copied to the high bit of the ID) is used
       
   272             // to have two distinct ranges to choose IDs from for the
       
   273             // two endpoints.
       
   274             if (orig)
       
   275                 id |= 0x8000;
       
   276         } while (connectionTable.get(id) != null);
       
   277 
       
   278         // create multiplexing streams and bookkeeping information
       
   279         MultiplexConnectionInfo info = new MultiplexConnectionInfo(id);
       
   280         info.in = new MultiplexInputStream(this, info, 2048);
       
   281         info.out = new MultiplexOutputStream(this, info, 2048);
       
   282 
       
   283         // add to connection table if multiplexer has not died
       
   284         synchronized (connectionTable) {
       
   285             if (!alive)
       
   286                 throw new IOException("Multiplexer connection dead");
       
   287             if (numConnections >= maxConnections)
       
   288                 throw new IOException("Cannot exceed " + maxConnections +
       
   289                     " simultaneous multiplexed connections");
       
   290             connectionTable.put(id, info);
       
   291             ++ numConnections;
       
   292         }
       
   293 
       
   294         // inform remote endpoint of new connection
       
   295         synchronized (dataOut) {
       
   296             try {
       
   297                 dataOut.writeByte(OPEN);
       
   298                 dataOut.writeShort(id);
       
   299                 dataOut.flush();
       
   300             } catch (IOException e) {
       
   301                 multiplexLog.log(Log.BRIEF, "exception: ", e);
       
   302 
       
   303                 shutDown();
       
   304                 throw e;
       
   305             }
       
   306         }
       
   307 
       
   308         return new TCPConnection(channel, info.in, info.out);
       
   309     }
       
   310 
       
   311     /**
       
   312      * Shut down all connections and clean up.
       
   313      */
       
   314     public void shutDown()
       
   315     {
       
   316         // inform all associated streams
       
   317         synchronized (connectionTable) {
       
   318             // return if multiplexer already officially dead
       
   319             if (!alive)
       
   320                 return;
       
   321             alive = false;
       
   322 
       
   323             Enumeration<MultiplexConnectionInfo> enum_ =
       
   324                     connectionTable.elements();
       
   325             while (enum_.hasMoreElements()) {
       
   326                 MultiplexConnectionInfo info = enum_.nextElement();
       
   327                 info.in.disconnect();
       
   328                 info.out.disconnect();
       
   329             }
       
   330             connectionTable.clear();
       
   331             numConnections = 0;
       
   332         }
       
   333 
       
   334         // close underlying connection, if possible (and not already done)
       
   335         try {
       
   336             in.close();
       
   337         } catch (IOException e) {
       
   338         }
       
   339         try {
       
   340             out.close();
       
   341         } catch (IOException e) {
       
   342         }
       
   343     }
       
   344 
       
   345     /**
       
   346      * Send request for more data on connection to remote endpoint.
       
   347      * @param info connection information structure
       
   348      * @param len number of more bytes that can be received
       
   349      */
       
   350     void sendRequest(MultiplexConnectionInfo info, int len) throws IOException
       
   351     {
       
   352         synchronized (dataOut) {
       
   353             if (alive && !info.closed)
       
   354                 try {
       
   355                     dataOut.writeByte(REQUEST);
       
   356                     dataOut.writeShort(info.id);
       
   357                     dataOut.writeInt(len);
       
   358                     dataOut.flush();
       
   359                 } catch (IOException e) {
       
   360                     multiplexLog.log(Log.BRIEF, "exception: ", e);
       
   361 
       
   362                     shutDown();
       
   363                     throw e;
       
   364                 }
       
   365         }
       
   366     }
       
   367 
       
   368     /**
       
   369      * Send packet of requested data on connection to remote endpoint.
       
   370      * @param info connection information structure
       
   371      * @param buf array containing bytes to send
       
   372      * @param off offset of first array index of packet
       
   373      * @param len number of bytes in packet to send
       
   374      */
       
   375     void sendTransmit(MultiplexConnectionInfo info,
       
   376                       byte buf[], int off, int len) throws IOException
       
   377     {
       
   378         synchronized (dataOut) {
       
   379             if (alive && !info.closed)
       
   380                 try {
       
   381                     dataOut.writeByte(TRANSMIT);
       
   382                     dataOut.writeShort(info.id);
       
   383                     dataOut.writeInt(len);
       
   384                     dataOut.write(buf, off, len);
       
   385                     dataOut.flush();
       
   386                 } catch (IOException e) {
       
   387                     multiplexLog.log(Log.BRIEF, "exception: ", e);
       
   388 
       
   389                     shutDown();
       
   390                     throw e;
       
   391                 }
       
   392         }
       
   393     }
       
   394 
       
   395     /**
       
   396      * Inform remote endpoint that connection has been closed.
       
   397      * @param info connection information structure
       
   398      */
       
   399     void sendClose(MultiplexConnectionInfo info) throws IOException
       
   400     {
       
   401         info.out.disconnect();
       
   402         synchronized (dataOut) {
       
   403             if (alive && !info.closed)
       
   404                 try {
       
   405                     dataOut.writeByte(CLOSE);
       
   406                     dataOut.writeShort(info.id);
       
   407                     dataOut.flush();
       
   408                     info.closed = true;
       
   409                 } catch (IOException e) {
       
   410                     multiplexLog.log(Log.BRIEF, "exception: ", e);
       
   411 
       
   412                     shutDown();
       
   413                     throw e;
       
   414                 }
       
   415         }
       
   416     }
       
   417 
       
   418     /**
       
   419      * Acknowledge remote endpoint's closing of connection.
       
   420      * @param info connection information structure
       
   421      */
       
   422     void sendCloseAck(MultiplexConnectionInfo info) throws IOException
       
   423     {
       
   424         synchronized (dataOut) {
       
   425             if (alive && !info.closed)
       
   426                 try {
       
   427                     dataOut.writeByte(CLOSEACK);
       
   428                     dataOut.writeShort(info.id);
       
   429                     dataOut.flush();
       
   430                     info.closed = true;
       
   431                 } catch (IOException e) {
       
   432                     multiplexLog.log(Log.BRIEF, "exception: ", e);
       
   433 
       
   434                     shutDown();
       
   435                     throw e;
       
   436                 }
       
   437         }
       
   438     }
       
   439 
       
   440     /**
       
   441      * Shut down connection upon finalization.
       
   442      */
       
   443     @SuppressWarnings("deprecation")
       
   444     protected void finalize() throws Throwable
       
   445     {
       
   446         super.finalize();
       
   447         shutDown();
       
   448     }
       
   449 }