jdk/src/java.rmi/share/classes/sun/rmi/transport/tcp/TCPChannel.java
changeset 25859 3317bb8137f4
parent 23333 b0af2c7c8c91
child 45333 fd3fdfe96bb4
equal deleted inserted replaced
25858:836adbf7a2cd 25859:3317bb8137f4
       
     1 /*
       
     2  * Copyright (c) 1996, 2013, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 package sun.rmi.transport.tcp;
       
    26 
       
    27 import java.io.DataInputStream;
       
    28 import java.io.DataOutputStream;
       
    29 import java.io.IOException;
       
    30 import java.lang.ref.Reference;
       
    31 import java.lang.ref.SoftReference;
       
    32 import java.net.Socket;
       
    33 import java.rmi.ConnectIOException;
       
    34 import java.rmi.RemoteException;
       
    35 import java.security.AccessControlContext;
       
    36 import java.security.AccessController;
       
    37 import java.security.PrivilegedAction;
       
    38 import java.util.ArrayList;
       
    39 import java.util.List;
       
    40 import java.util.ListIterator;
       
    41 import java.util.WeakHashMap;
       
    42 import java.util.concurrent.Future;
       
    43 import java.util.concurrent.ScheduledExecutorService;
       
    44 import java.util.concurrent.TimeUnit;
       
    45 import sun.rmi.runtime.Log;
       
    46 import sun.rmi.runtime.NewThreadAction;
       
    47 import sun.rmi.runtime.RuntimeUtil;
       
    48 import sun.rmi.transport.Channel;
       
    49 import sun.rmi.transport.Connection;
       
    50 import sun.rmi.transport.Endpoint;
       
    51 import sun.rmi.transport.TransportConstants;
       
    52 
       
    53 /**
       
    54  * TCPChannel is the socket-based implementation of the RMI Channel
       
    55  * abstraction.
       
    56  *
       
    57  * @author Ann Wollrath
       
    58  */
       
    59 public class TCPChannel implements Channel {
       
    60     /** endpoint for this channel */
       
    61     private final TCPEndpoint ep;
       
    62     /** transport for this channel */
       
    63     private final TCPTransport tr;
       
    64     /** list of cached connections */
       
    65     private final List<TCPConnection> freeList =
       
    66         new ArrayList<>();
       
    67     /** frees cached connections that have expired (guarded by freeList) */
       
    68     private Future<?> reaper = null;
       
    69 
       
    70     /** using multiplexer (for bi-directional applet communication */
       
    71     private boolean usingMultiplexer = false;
       
    72     /** connection multiplexer, if used */
       
    73     private ConnectionMultiplexer multiplexer = null;
       
    74     /** connection acceptor (should be in TCPTransport) */
       
    75     private ConnectionAcceptor acceptor;
       
    76 
       
    77     /** most recently authorized AccessControlContext */
       
    78     private AccessControlContext okContext;
       
    79 
       
    80     /** cache of authorized AccessControlContexts */
       
    81     private WeakHashMap<AccessControlContext,
       
    82                         Reference<AccessControlContext>> authcache;
       
    83 
       
    84     /** the SecurityManager which authorized okContext and authcache */
       
    85     private SecurityManager cacheSecurityManager = null;
       
    86 
       
    87     /** client-side connection idle usage timeout */
       
    88     private static final long idleTimeout =             // default 15 seconds
       
    89         AccessController.doPrivileged((PrivilegedAction<Long>) () ->
       
    90             Long.getLong("sun.rmi.transport.connectionTimeout", 15000));
       
    91 
       
    92     /** client-side connection handshake read timeout */
       
    93     private static final int handshakeTimeout =         // default 1 minute
       
    94         AccessController.doPrivileged((PrivilegedAction<Integer>) () ->
       
    95             Integer.getInteger("sun.rmi.transport.tcp.handshakeTimeout", 60000));
       
    96 
       
    97     /** client-side connection response read timeout (after handshake) */
       
    98     private static final int responseTimeout =          // default infinity
       
    99         AccessController.doPrivileged((PrivilegedAction<Integer>) () ->
       
   100             Integer.getInteger("sun.rmi.transport.tcp.responseTimeout", 0));
       
   101 
       
   102     /** thread pool for scheduling delayed tasks */
       
   103     private static final ScheduledExecutorService scheduler =
       
   104         AccessController.doPrivileged(
       
   105             new RuntimeUtil.GetInstanceAction()).getScheduler();
       
   106 
       
   107     /**
       
   108      * Create channel for endpoint.
       
   109      */
       
   110     TCPChannel(TCPTransport tr, TCPEndpoint ep) {
       
   111         this.tr = tr;
       
   112         this.ep = ep;
       
   113     }
       
   114 
       
   115     /**
       
   116      * Return the endpoint for this channel.
       
   117      */
       
   118     public Endpoint getEndpoint() {
       
   119         return ep;
       
   120     }
       
   121 
       
   122     /**
       
   123      * Checks if the current caller has sufficient privilege to make
       
   124      * a connection to the remote endpoint.
       
   125      * @exception SecurityException if caller is not allowed to use this
       
   126      * Channel.
       
   127      */
       
   128     private void checkConnectPermission() throws SecurityException {
       
   129         SecurityManager security = System.getSecurityManager();
       
   130         if (security == null)
       
   131             return;
       
   132 
       
   133         if (security != cacheSecurityManager) {
       
   134             // The security manager changed: flush the cache
       
   135             okContext = null;
       
   136             authcache = new WeakHashMap<AccessControlContext,
       
   137                                         Reference<AccessControlContext>>();
       
   138             cacheSecurityManager = security;
       
   139         }
       
   140 
       
   141         AccessControlContext ctx = AccessController.getContext();
       
   142 
       
   143         // If ctx is the same context as last time, or if it
       
   144         // appears in the cache, bypass the checkConnect.
       
   145         if (okContext == null ||
       
   146             !(okContext.equals(ctx) || authcache.containsKey(ctx)))
       
   147         {
       
   148             security.checkConnect(ep.getHost(), ep.getPort());
       
   149             authcache.put(ctx, new SoftReference<AccessControlContext>(ctx));
       
   150             // A WeakHashMap is transformed into a SoftHashSet by making
       
   151             // each value softly refer to its own key (Peter's idea).
       
   152         }
       
   153         okContext = ctx;
       
   154     }
       
   155 
       
   156     /**
       
   157      * Supplies a connection to the endpoint of the address space
       
   158      * for which this is a channel.  The returned connection may
       
   159      * be one retrieved from a cache of idle connections.
       
   160      */
       
   161     public Connection newConnection() throws RemoteException {
       
   162         TCPConnection conn;
       
   163 
       
   164         // loop until we find a free live connection (in which case
       
   165         // we return) or until we run out of freelist (in which case
       
   166         // the loop exits)
       
   167         do {
       
   168             conn = null;
       
   169             // try to get a free connection
       
   170             synchronized (freeList) {
       
   171                 int elementPos = freeList.size()-1;
       
   172 
       
   173                 if (elementPos >= 0) {
       
   174                     // If there is a security manager, make sure
       
   175                     // the caller is allowed to connect to the
       
   176                     // requested endpoint.
       
   177                     checkConnectPermission();
       
   178                     conn = freeList.get(elementPos);
       
   179                     freeList.remove(elementPos);
       
   180                 }
       
   181             }
       
   182 
       
   183             // at this point, conn is null iff the freelist is empty,
       
   184             // and nonnull if a free connection of uncertain vitality
       
   185             // has been found.
       
   186 
       
   187             if (conn != null) {
       
   188                 // check to see if the connection has closed since last use
       
   189                 if (!conn.isDead()) {
       
   190                     TCPTransport.tcpLog.log(Log.BRIEF, "reuse connection");
       
   191                     return conn;
       
   192                 }
       
   193 
       
   194                 // conn is dead, and cannot be reused (reuse => false)
       
   195                 this.free(conn, false);
       
   196             }
       
   197         } while (conn != null);
       
   198 
       
   199         // none free, so create a new connection
       
   200         return (createConnection());
       
   201     }
       
   202 
       
   203     /**
       
   204      * Create a new connection to the remote endpoint of this channel.
       
   205      * The returned connection is new.  The caller must already have
       
   206      * passed a security checkConnect or equivalent.
       
   207      */
       
   208     private Connection createConnection() throws RemoteException {
       
   209         Connection conn;
       
   210 
       
   211         TCPTransport.tcpLog.log(Log.BRIEF, "create connection");
       
   212 
       
   213         if (!usingMultiplexer) {
       
   214             Socket sock = ep.newSocket();
       
   215             conn = new TCPConnection(this, sock);
       
   216 
       
   217             try {
       
   218                 DataOutputStream out =
       
   219                     new DataOutputStream(conn.getOutputStream());
       
   220                 writeTransportHeader(out);
       
   221 
       
   222                 // choose protocol (single op if not reusable socket)
       
   223                 if (!conn.isReusable()) {
       
   224                     out.writeByte(TransportConstants.SingleOpProtocol);
       
   225                 } else {
       
   226                     out.writeByte(TransportConstants.StreamProtocol);
       
   227                     out.flush();
       
   228 
       
   229                     /*
       
   230                      * Set socket read timeout to configured value for JRMP
       
   231                      * connection handshake; this also serves to guard against
       
   232                      * non-JRMP servers that do not respond (see 4322806).
       
   233                      */
       
   234                     int originalSoTimeout = 0;
       
   235                     try {
       
   236                         originalSoTimeout = sock.getSoTimeout();
       
   237                         sock.setSoTimeout(handshakeTimeout);
       
   238                     } catch (Exception e) {
       
   239                         // if we fail to set this, ignore and proceed anyway
       
   240                     }
       
   241 
       
   242                     DataInputStream in =
       
   243                         new DataInputStream(conn.getInputStream());
       
   244                     byte ack = in.readByte();
       
   245                     if (ack != TransportConstants.ProtocolAck) {
       
   246                         throw new ConnectIOException(
       
   247                             ack == TransportConstants.ProtocolNack ?
       
   248                             "JRMP StreamProtocol not supported by server" :
       
   249                             "non-JRMP server at remote endpoint");
       
   250                     }
       
   251 
       
   252                     String suggestedHost = in.readUTF();
       
   253                     int    suggestedPort = in.readInt();
       
   254                     if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) {
       
   255                         TCPTransport.tcpLog.log(Log.VERBOSE,
       
   256                             "server suggested " + suggestedHost + ":" +
       
   257                             suggestedPort);
       
   258                     }
       
   259 
       
   260                     // set local host name, if unknown
       
   261                     TCPEndpoint.setLocalHost(suggestedHost);
       
   262                     // do NOT set the default port, because we don't
       
   263                     // know if we can't listen YET...
       
   264 
       
   265                     // write out default endpoint to match protocol
       
   266                     // (but it serves no purpose)
       
   267                     TCPEndpoint localEp =
       
   268                         TCPEndpoint.getLocalEndpoint(0, null, null);
       
   269                     out.writeUTF(localEp.getHost());
       
   270                     out.writeInt(localEp.getPort());
       
   271                     if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) {
       
   272                         TCPTransport.tcpLog.log(Log.VERBOSE, "using " +
       
   273                             localEp.getHost() + ":" + localEp.getPort());
       
   274                     }
       
   275 
       
   276                     /*
       
   277                      * After JRMP handshake, set socket read timeout to value
       
   278                      * configured for the rest of the lifetime of the
       
   279                      * connection.  NOTE: this timeout, if configured to a
       
   280                      * finite duration, places an upper bound on the time
       
   281                      * that a remote method call is permitted to execute.
       
   282                      */
       
   283                     try {
       
   284                         /*
       
   285                          * If socket factory had set a non-zero timeout on its
       
   286                          * own, then restore it instead of using the property-
       
   287                          * configured value.
       
   288                          */
       
   289                         sock.setSoTimeout((originalSoTimeout != 0 ?
       
   290                                            originalSoTimeout :
       
   291                                            responseTimeout));
       
   292                     } catch (Exception e) {
       
   293                         // if we fail to set this, ignore and proceed anyway
       
   294                     }
       
   295 
       
   296                     out.flush();
       
   297                 }
       
   298             } catch (IOException e) {
       
   299                 if (e instanceof RemoteException)
       
   300                     throw (RemoteException) e;
       
   301                 else
       
   302                     throw new ConnectIOException(
       
   303                         "error during JRMP connection establishment", e);
       
   304             }
       
   305         } else {
       
   306             try {
       
   307                 conn = multiplexer.openConnection();
       
   308             } catch (IOException e) {
       
   309                 synchronized (this) {
       
   310                     usingMultiplexer = false;
       
   311                     multiplexer = null;
       
   312                 }
       
   313                 throw new ConnectIOException(
       
   314                     "error opening virtual connection " +
       
   315                     "over multiplexed connection", e);
       
   316             }
       
   317         }
       
   318         return conn;
       
   319     }
       
   320 
       
   321     /**
       
   322      * Free the connection generated by this channel.
       
   323      * @param conn The connection
       
   324      * @param reuse If true, the connection is in a state in which it
       
   325      *        can be reused for another method call.
       
   326      */
       
   327     public void free(Connection conn, boolean reuse) {
       
   328         if (conn == null) return;
       
   329 
       
   330         if (reuse && conn.isReusable()) {
       
   331             long lastuse = System.currentTimeMillis();
       
   332             TCPConnection tcpConnection = (TCPConnection) conn;
       
   333 
       
   334             TCPTransport.tcpLog.log(Log.BRIEF, "reuse connection");
       
   335 
       
   336             /*
       
   337              * Cache connection; if reaper task for expired
       
   338              * connections isn't scheduled, then schedule it.
       
   339              */
       
   340             synchronized (freeList) {
       
   341                 freeList.add(tcpConnection);
       
   342                 if (reaper == null) {
       
   343                     TCPTransport.tcpLog.log(Log.BRIEF, "create reaper");
       
   344 
       
   345                     reaper = scheduler.scheduleWithFixedDelay(
       
   346                         new Runnable() {
       
   347                             public void run() {
       
   348                                 TCPTransport.tcpLog.log(Log.VERBOSE,
       
   349                                                         "wake up");
       
   350                                 freeCachedConnections();
       
   351                             }
       
   352                         }, idleTimeout, idleTimeout, TimeUnit.MILLISECONDS);
       
   353                 }
       
   354             }
       
   355 
       
   356             tcpConnection.setLastUseTime(lastuse);
       
   357             tcpConnection.setExpiration(lastuse + idleTimeout);
       
   358         } else {
       
   359             TCPTransport.tcpLog.log(Log.BRIEF, "close connection");
       
   360 
       
   361             try {
       
   362                 conn.close();
       
   363             } catch (IOException ignored) {
       
   364             }
       
   365         }
       
   366     }
       
   367 
       
   368     /**
       
   369      * Send transport header over stream.
       
   370      */
       
   371     private void writeTransportHeader(DataOutputStream out)
       
   372         throws RemoteException
       
   373     {
       
   374         try {
       
   375             // write out transport header
       
   376             DataOutputStream dataOut =
       
   377                 new DataOutputStream(out);
       
   378             dataOut.writeInt(TransportConstants.Magic);
       
   379             dataOut.writeShort(TransportConstants.Version);
       
   380         } catch (IOException e) {
       
   381             throw new ConnectIOException(
       
   382                 "error writing JRMP transport header", e);
       
   383         }
       
   384     }
       
   385 
       
   386     /**
       
   387      * Use given connection multiplexer object to obtain new connections
       
   388      * through this channel.
       
   389      */
       
   390     synchronized void useMultiplexer(ConnectionMultiplexer newMultiplexer) {
       
   391         // for now, always just use the last one given
       
   392         multiplexer = newMultiplexer;
       
   393 
       
   394         usingMultiplexer = true;
       
   395     }
       
   396 
       
   397     /**
       
   398      * Accept a connection provided over a multiplexed channel.
       
   399      */
       
   400     void acceptMultiplexConnection(Connection conn) {
       
   401         if (acceptor == null) {
       
   402             acceptor = new ConnectionAcceptor(tr);
       
   403             acceptor.startNewAcceptor();
       
   404         }
       
   405         acceptor.accept(conn);
       
   406     }
       
   407 
       
   408     /**
       
   409      * Closes all the connections in the cache, whether timed out or not.
       
   410      */
       
   411     public void shedCache() {
       
   412         // Build a list of connections, to avoid holding the freeList
       
   413         // lock during (potentially long-running) close() calls.
       
   414         Connection[] conn;
       
   415         synchronized (freeList) {
       
   416             conn = freeList.toArray(new Connection[freeList.size()]);
       
   417             freeList.clear();
       
   418         }
       
   419 
       
   420         // Close all the connections that were free
       
   421         for (int i = conn.length; --i >= 0; ) {
       
   422             Connection c = conn[i];
       
   423             conn[i] = null; // help gc
       
   424             try {
       
   425                 c.close();
       
   426             } catch (java.io.IOException e) {
       
   427                 // eat exception
       
   428             }
       
   429         }
       
   430     }
       
   431 
       
   432     private void freeCachedConnections() {
       
   433         /*
       
   434          * Remove each connection whose time out has expired.
       
   435          */
       
   436         synchronized (freeList) {
       
   437             int size = freeList.size();
       
   438 
       
   439             if (size > 0) {
       
   440                 long time = System.currentTimeMillis();
       
   441                 ListIterator<TCPConnection> iter = freeList.listIterator(size);
       
   442 
       
   443                 while (iter.hasPrevious()) {
       
   444                     TCPConnection conn = iter.previous();
       
   445                     if (conn.expired(time)) {
       
   446                         TCPTransport.tcpLog.log(Log.VERBOSE,
       
   447                             "connection timeout expired");
       
   448 
       
   449                         try {
       
   450                             conn.close();
       
   451                         } catch (java.io.IOException e) {
       
   452                             // eat exception
       
   453                         }
       
   454                         iter.remove();
       
   455                     }
       
   456                 }
       
   457             }
       
   458 
       
   459             if (freeList.isEmpty()) {
       
   460                 reaper.cancel(false);
       
   461                 reaper = null;
       
   462             }
       
   463         }
       
   464     }
       
   465 }
       
   466 
       
   467 /**
       
   468  * ConnectionAcceptor manages accepting new connections and giving them
       
   469  * to TCPTransport's message handler on new threads.
       
   470  *
       
   471  * Since this object only needs to know which transport to give new
       
   472  * connections to, it doesn't need to be per-channel as currently
       
   473  * implemented.
       
   474  */
       
   475 class ConnectionAcceptor implements Runnable {
       
   476 
       
   477     /** transport that will handle message on accepted connections */
       
   478     private TCPTransport transport;
       
   479 
       
   480     /** queue of connections to be accepted */
       
   481     private List<Connection> queue = new ArrayList<>();
       
   482 
       
   483     /** thread ID counter */
       
   484     private static int threadNum = 0;
       
   485 
       
   486     /**
       
   487      * Create a new ConnectionAcceptor that will give connections
       
   488      * to the specified transport on a new thread.
       
   489      */
       
   490     public ConnectionAcceptor(TCPTransport transport) {
       
   491         this.transport = transport;
       
   492     }
       
   493 
       
   494     /**
       
   495      * Start a new thread to accept connections.
       
   496      */
       
   497     public void startNewAcceptor() {
       
   498         Thread t = AccessController.doPrivileged(
       
   499             new NewThreadAction(ConnectionAcceptor.this,
       
   500                                 "Multiplex Accept-" + ++ threadNum,
       
   501                                 true));
       
   502         t.start();
       
   503     }
       
   504 
       
   505     /**
       
   506      * Add connection to queue of connections to be accepted.
       
   507      */
       
   508     public void accept(Connection conn) {
       
   509         synchronized (queue) {
       
   510             queue.add(conn);
       
   511             queue.notify();
       
   512         }
       
   513     }
       
   514 
       
   515     /**
       
   516      * Give transport next accepted connection, when available.
       
   517      */
       
   518     public void run() {
       
   519         Connection conn;
       
   520 
       
   521         synchronized (queue) {
       
   522             while (queue.size() == 0) {
       
   523                 try {
       
   524                     queue.wait();
       
   525                 } catch (InterruptedException e) {
       
   526                 }
       
   527             }
       
   528             startNewAcceptor();
       
   529             conn = queue.remove(0);
       
   530         }
       
   531 
       
   532         transport.handleMessages(conn, true);
       
   533     }
       
   534 }