8087189: RMI server-side multiplex protocol support should be removed
Reviewed-by: alanb
--- a/jdk/src/java.rmi/share/classes/sun/rmi/server/ActivatableRef.java Thu Aug 31 12:48:19 2017 -0700
+++ b/jdk/src/java.rmi/share/classes/sun/rmi/server/ActivatableRef.java Thu Aug 31 17:08:35 2017 -0400
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 1997, 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 1997, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -158,8 +158,7 @@
exception = e;
} catch (ConnectIOException e) {
/*
- * Failure setting up multiplexed connection or reusing
- * cached connection; retry call
+ * Failure reusing cached connection; retry call
*/
exception = e;
} catch (MarshalException e) {
--- a/jdk/src/java.rmi/share/classes/sun/rmi/transport/tcp/ConnectionMultiplexer.java Thu Aug 31 12:48:19 2017 -0700
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,449 +0,0 @@
-/*
- * Copyright (c) 1996, 2017, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation. Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package sun.rmi.transport.tcp;
-
-import java.io.*;
-import java.util.*;
-import java.rmi.server.LogStream;
-import java.security.PrivilegedAction;
-
-import sun.rmi.runtime.Log;
-
-/**
- * ConnectionMultiplexer manages the transparent multiplexing of
- * multiple virtual connections from one endpoint to another through
- * one given real connection to that endpoint. The input and output
- * streams for the underlying real connection must be supplied.
- * A callback object is also supplied to be informed of new virtual
- * connections opened by the remote endpoint. After creation, the
- * run() method must be called in a thread created for demultiplexing
- * the connections. The openConnection() method is called to
- * initiate a virtual connection from this endpoint.
- *
- * @author Peter Jones
- */
-@SuppressWarnings("deprecation")
-final class ConnectionMultiplexer {
-
- /** "multiplex" log level */
- static int logLevel = LogStream.parseLevel(getLogLevel());
-
- private static String getLogLevel() {
- return java.security.AccessController.doPrivileged(
- (PrivilegedAction<String>) () -> System.getProperty("sun.rmi.transport.tcp.multiplex.logLevel"));
- }
-
- /* multiplex system log */
- static final Log multiplexLog =
- Log.getLog("sun.rmi.transport.tcp.multiplex",
- "multiplex", ConnectionMultiplexer.logLevel);
-
- /** multiplexing protocol operation codes */
- private final static int OPEN = 0xE1;
- private final static int CLOSE = 0xE2;
- private final static int CLOSEACK = 0xE3;
- private final static int REQUEST = 0xE4;
- private final static int TRANSMIT = 0xE5;
-
- /** object to notify for new connections from remote endpoint */
- private TCPChannel channel;
-
- /** input stream for underlying single connection */
- private InputStream in;
-
- /** output stream for underlying single connection */
- private OutputStream out;
-
- /** true if underlying connection originated from this endpoint
- (used for generating unique connection IDs) */
- private boolean orig;
-
- /** layered stream for reading formatted data from underlying connection */
- private DataInputStream dataIn;
-
- /** layered stream for writing formatted data to underlying connection */
- private DataOutputStream dataOut;
-
- /** table holding currently open connection IDs and related info */
- private Hashtable<Integer, MultiplexConnectionInfo> connectionTable = new Hashtable<>(7);
-
- /** number of currently open connections */
- private int numConnections = 0;
-
- /** maximum allowed open connections */
- private final static int maxConnections = 256;
-
- /** ID of last connection opened */
- private int lastID = 0x1001;
-
- /** true if this mechanism is still alive */
- private boolean alive = true;
-
- /**
- * Create a new ConnectionMultiplexer using the given underlying
- * input/output stream pair. The run method must be called
- * (possibly on a new thread) to handle the demultiplexing.
- * @param channel object to notify when new connection is received
- * @param in input stream of underlying connection
- * @param out output stream of underlying connection
- * @param orig true if this endpoint intiated the underlying
- * connection (needs to be set differently at both ends)
- */
- public ConnectionMultiplexer(
- TCPChannel channel,
- InputStream in,
- OutputStream out,
- boolean orig)
- {
- this.channel = channel;
- this.in = in;
- this.out = out;
- this.orig = orig;
-
- dataIn = new DataInputStream(in);
- dataOut = new DataOutputStream(out);
- }
-
- /**
- * Process multiplexing protocol received from underlying connection.
- */
- public void run() throws IOException
- {
- try {
- int op, id, length;
- MultiplexConnectionInfo info;
-
- while (true) {
-
- // read next op code from remote endpoint
- op = dataIn.readUnsignedByte();
- switch (op) {
-
- // remote endpoint initiating new connection
- case OPEN:
- id = dataIn.readUnsignedShort();
-
- if (multiplexLog.isLoggable(Log.VERBOSE)) {
- multiplexLog.log(Log.VERBOSE, "operation OPEN " + id);
- }
-
- info = connectionTable.get(id);
- if (info != null)
- throw new IOException(
- "OPEN: Connection ID already exists");
- info = new MultiplexConnectionInfo(id);
- info.in = new MultiplexInputStream(this, info, 2048);
- info.out = new MultiplexOutputStream(this, info, 2048);
- synchronized (connectionTable) {
- connectionTable.put(id, info);
- ++ numConnections;
- }
- sun.rmi.transport.Connection conn;
- conn = new TCPConnection(channel, info.in, info.out);
- channel.acceptMultiplexConnection(conn);
- break;
-
- // remote endpoint closing connection
- case CLOSE:
- id = dataIn.readUnsignedShort();
-
- if (multiplexLog.isLoggable(Log.VERBOSE)) {
- multiplexLog.log(Log.VERBOSE, "operation CLOSE " + id);
- }
-
- info = connectionTable.get(id);
- if (info == null)
- throw new IOException(
- "CLOSE: Invalid connection ID");
- info.in.disconnect();
- info.out.disconnect();
- if (!info.closed)
- sendCloseAck(info);
- synchronized (connectionTable) {
- connectionTable.remove(id);
- -- numConnections;
- }
- break;
-
- // remote endpoint acknowledging close of connection
- case CLOSEACK:
- id = dataIn.readUnsignedShort();
-
- if (multiplexLog.isLoggable(Log.VERBOSE)) {
- multiplexLog.log(Log.VERBOSE,
- "operation CLOSEACK " + id);
- }
-
- info = connectionTable.get(id);
- if (info == null)
- throw new IOException(
- "CLOSEACK: Invalid connection ID");
- if (!info.closed)
- throw new IOException(
- "CLOSEACK: Connection not closed");
- info.in.disconnect();
- info.out.disconnect();
- synchronized (connectionTable) {
- connectionTable.remove(id);
- -- numConnections;
- }
- break;
-
- // remote endpoint declaring additional bytes receivable
- case REQUEST:
- id = dataIn.readUnsignedShort();
- info = connectionTable.get(id);
- if (info == null)
- throw new IOException(
- "REQUEST: Invalid connection ID");
- length = dataIn.readInt();
-
- if (multiplexLog.isLoggable(Log.VERBOSE)) {
- multiplexLog.log(Log.VERBOSE,
- "operation REQUEST " + id + ": " + length);
- }
-
- info.out.request(length);
- break;
-
- // remote endpoint transmitting data packet
- case TRANSMIT:
- id = dataIn.readUnsignedShort();
- info = connectionTable.get(id);
- if (info == null)
- throw new IOException("SEND: Invalid connection ID");
- length = dataIn.readInt();
-
- if (multiplexLog.isLoggable(Log.VERBOSE)) {
- multiplexLog.log(Log.VERBOSE,
- "operation TRANSMIT " + id + ": " + length);
- }
-
- info.in.receive(length, dataIn);
- break;
-
- default:
- throw new IOException("Invalid operation: " +
- Integer.toHexString(op));
- }
- }
- } finally {
- shutDown();
- }
- }
-
- /**
- * Initiate a new multiplexed connection through the underlying
- * connection.
- */
- public synchronized TCPConnection openConnection() throws IOException
- {
- // generate ID that should not be already used
- // If all possible 32768 IDs are used,
- // this method will block searching for a new ID forever.
- int id;
- do {
- lastID = (++ lastID) & 0x7FFF;
- id = lastID;
-
- // The orig flag (copied to the high bit of the ID) is used
- // to have two distinct ranges to choose IDs from for the
- // two endpoints.
- if (orig)
- id |= 0x8000;
- } while (connectionTable.get(id) != null);
-
- // create multiplexing streams and bookkeeping information
- MultiplexConnectionInfo info = new MultiplexConnectionInfo(id);
- info.in = new MultiplexInputStream(this, info, 2048);
- info.out = new MultiplexOutputStream(this, info, 2048);
-
- // add to connection table if multiplexer has not died
- synchronized (connectionTable) {
- if (!alive)
- throw new IOException("Multiplexer connection dead");
- if (numConnections >= maxConnections)
- throw new IOException("Cannot exceed " + maxConnections +
- " simultaneous multiplexed connections");
- connectionTable.put(id, info);
- ++ numConnections;
- }
-
- // inform remote endpoint of new connection
- synchronized (dataOut) {
- try {
- dataOut.writeByte(OPEN);
- dataOut.writeShort(id);
- dataOut.flush();
- } catch (IOException e) {
- multiplexLog.log(Log.BRIEF, "exception: ", e);
-
- shutDown();
- throw e;
- }
- }
-
- return new TCPConnection(channel, info.in, info.out);
- }
-
- /**
- * Shut down all connections and clean up.
- */
- public void shutDown()
- {
- // inform all associated streams
- synchronized (connectionTable) {
- // return if multiplexer already officially dead
- if (!alive)
- return;
- alive = false;
-
- Enumeration<MultiplexConnectionInfo> enum_ =
- connectionTable.elements();
- while (enum_.hasMoreElements()) {
- MultiplexConnectionInfo info = enum_.nextElement();
- info.in.disconnect();
- info.out.disconnect();
- }
- connectionTable.clear();
- numConnections = 0;
- }
-
- // close underlying connection, if possible (and not already done)
- try {
- in.close();
- } catch (IOException e) {
- }
- try {
- out.close();
- } catch (IOException e) {
- }
- }
-
- /**
- * Send request for more data on connection to remote endpoint.
- * @param info connection information structure
- * @param len number of more bytes that can be received
- */
- void sendRequest(MultiplexConnectionInfo info, int len) throws IOException
- {
- synchronized (dataOut) {
- if (alive && !info.closed)
- try {
- dataOut.writeByte(REQUEST);
- dataOut.writeShort(info.id);
- dataOut.writeInt(len);
- dataOut.flush();
- } catch (IOException e) {
- multiplexLog.log(Log.BRIEF, "exception: ", e);
-
- shutDown();
- throw e;
- }
- }
- }
-
- /**
- * Send packet of requested data on connection to remote endpoint.
- * @param info connection information structure
- * @param buf array containing bytes to send
- * @param off offset of first array index of packet
- * @param len number of bytes in packet to send
- */
- void sendTransmit(MultiplexConnectionInfo info,
- byte buf[], int off, int len) throws IOException
- {
- synchronized (dataOut) {
- if (alive && !info.closed)
- try {
- dataOut.writeByte(TRANSMIT);
- dataOut.writeShort(info.id);
- dataOut.writeInt(len);
- dataOut.write(buf, off, len);
- dataOut.flush();
- } catch (IOException e) {
- multiplexLog.log(Log.BRIEF, "exception: ", e);
-
- shutDown();
- throw e;
- }
- }
- }
-
- /**
- * Inform remote endpoint that connection has been closed.
- * @param info connection information structure
- */
- void sendClose(MultiplexConnectionInfo info) throws IOException
- {
- info.out.disconnect();
- synchronized (dataOut) {
- if (alive && !info.closed)
- try {
- dataOut.writeByte(CLOSE);
- dataOut.writeShort(info.id);
- dataOut.flush();
- info.closed = true;
- } catch (IOException e) {
- multiplexLog.log(Log.BRIEF, "exception: ", e);
-
- shutDown();
- throw e;
- }
- }
- }
-
- /**
- * Acknowledge remote endpoint's closing of connection.
- * @param info connection information structure
- */
- void sendCloseAck(MultiplexConnectionInfo info) throws IOException
- {
- synchronized (dataOut) {
- if (alive && !info.closed)
- try {
- dataOut.writeByte(CLOSEACK);
- dataOut.writeShort(info.id);
- dataOut.flush();
- info.closed = true;
- } catch (IOException e) {
- multiplexLog.log(Log.BRIEF, "exception: ", e);
-
- shutDown();
- throw e;
- }
- }
- }
-
- /**
- * Shut down connection upon finalization.
- */
- @SuppressWarnings("deprecation")
- protected void finalize() throws Throwable
- {
- super.finalize();
- shutDown();
- }
-}
--- a/jdk/src/java.rmi/share/classes/sun/rmi/transport/tcp/MultiplexConnectionInfo.java Thu Aug 31 12:48:19 2017 -0700
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,55 +0,0 @@
-/*
- * Copyright (c) 1996, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation. Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package sun.rmi.transport.tcp;
-
-/**
- * MultiplexConnectionInfo groups related information about a
- * virtual connection managed by a ConnectionMultiplexer object.
- *
- * @author Peter Jones
- */
-class MultiplexConnectionInfo {
-
- /** integer that uniquely identifies this connection */
- int id;
-
- /** input stream for reading from connection */
- MultiplexInputStream in = null;
-
- /** output stream for writing to connection */
- MultiplexOutputStream out = null;
-
- /** true if this connection has been closed */
- boolean closed = false;
-
- /**
- * Create information structure for given connection identifier.
- * @param id connection identifier
- */
- MultiplexConnectionInfo(int id)
- {
- this.id = id;
- }
-}
--- a/jdk/src/java.rmi/share/classes/sun/rmi/transport/tcp/MultiplexInputStream.java Thu Aug 31 12:48:19 2017 -0700
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,213 +0,0 @@
-/*
- * Copyright (c) 1996, 1997, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation. Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package sun.rmi.transport.tcp;
-
-import java.io.*;
-
-/**
- * MultiplexInputStream manages receiving data over a connection managed
- * by a ConnectionMultiplexer object. This object is responsible for
- * requesting more bytes of data as space in its internal buffer becomes
- * available.
- *
- * @author Peter Jones
- */
-final class MultiplexInputStream extends InputStream {
-
- /** object managing multiplexed connection */
- private ConnectionMultiplexer manager;
-
- /** information about the connection this is the input stream for */
- private MultiplexConnectionInfo info;
-
- /** input buffer */
- private byte buffer[];
-
- /** number of real data bytes present in buffer */
- private int present = 0;
-
- /** current position to read from in input buffer */
- private int pos = 0;
-
- /** pending number of bytes this stream has requested */
- private int requested = 0;
-
- /** true if this connection has been disconnected */
- private boolean disconnected = false;
-
- /**
- * lock acquired to access shared variables:
- * buffer, present, pos, requested, & disconnected
- * WARNING: Any of the methods manager.send*() should not be
- * invoked while this lock is held, since they could potentially
- * block if the underlying connection's transport buffers are
- * full, and the manager may need to acquire this lock to process
- * and consume data coming over the underlying connection.
- */
- private Object lock = new Object();
-
- /** level at which more data is requested when read past */
- private int waterMark;
-
- /** data structure for holding reads of one byte */
- private byte temp[] = new byte[1];
-
- /**
- * Create a new MultiplexInputStream for the given manager.
- * @param manager object that manages this connection
- * @param info structure for connection this stream reads from
- * @param bufferLength length of input buffer
- */
- MultiplexInputStream(
- ConnectionMultiplexer manager,
- MultiplexConnectionInfo info,
- int bufferLength)
- {
- this.manager = manager;
- this.info = info;
-
- buffer = new byte[bufferLength];
- waterMark = bufferLength / 2;
- }
-
- /**
- * Read a byte from the connection.
- */
- public synchronized int read() throws IOException
- {
- int n = read(temp, 0, 1);
- if (n != 1)
- return -1;
- return temp[0] & 0xFF;
- }
-
- /**
- * Read a subarray of bytes from connection. This method blocks for
- * at least one byte, and it returns the number of bytes actually read,
- * or -1 if the end of the stream was detected.
- * @param b array to read bytes into
- * @param off offset of beginning of bytes to read into
- * @param len number of bytes to read
- */
- public synchronized int read(byte b[], int off, int len) throws IOException
- {
- if (len <= 0)
- return 0;
-
- int moreSpace;
- synchronized (lock) {
- if (pos >= present)
- pos = present = 0;
- else if (pos >= waterMark) {
- System.arraycopy(buffer, pos, buffer, 0, present - pos);
- present -= pos;
- pos = 0;
- }
- int freeSpace = buffer.length - present;
- moreSpace = Math.max(freeSpace - requested, 0);
- }
- if (moreSpace > 0)
- manager.sendRequest(info, moreSpace);
- synchronized (lock) {
- requested += moreSpace;
- while ((pos >= present) && !disconnected) {
- try {
- lock.wait();
- } catch (InterruptedException e) {
- }
- }
- if (disconnected && pos >= present)
- return -1;
-
- int available = present - pos;
- if (len < available) {
- System.arraycopy(buffer, pos, b, off, len);
- pos += len;
- return len;
- }
- else {
- System.arraycopy(buffer, pos, b, off, available);
- pos = present = 0;
- // could send another request here, if len > available??
- return available;
- }
- }
- }
-
- /**
- * Return the number of bytes immediately available for reading.
- */
- public int available() throws IOException
- {
- synchronized (lock) {
- return present - pos;
- }
- }
-
- /**
- * Close this connection.
- */
- public void close() throws IOException
- {
- manager.sendClose(info);
- }
-
- /**
- * Receive bytes transmitted from connection at remote endpoint.
- * @param length number of bytes transmitted
- * @param in input stream with those bytes ready to be read
- */
- void receive(int length, DataInputStream in)
- throws IOException
- {
- /* TO DO: Optimize so that data received from stream can be loaded
- * directly into user's buffer if there is a pending read().
- */
- synchronized (lock) {
- if ((pos > 0) && ((buffer.length - present) < length)) {
- System.arraycopy(buffer, pos, buffer, 0, present - pos);
- present -= pos;
- pos = 0;
- }
- if ((buffer.length - present) < length)
- throw new IOException("Receive buffer overflow");
- in.readFully(buffer, present, length);
- present += length;
- requested -= length;
- lock.notifyAll();
- }
- }
-
- /**
- * Disconnect this stream from all connection activity.
- */
- void disconnect()
- {
- synchronized (lock) {
- disconnected = true;
- lock.notifyAll();
- }
- }
-}
--- a/jdk/src/java.rmi/share/classes/sun/rmi/transport/tcp/MultiplexOutputStream.java Thu Aug 31 12:48:19 2017 -0700
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,231 +0,0 @@
-/*
- * Copyright (c) 1996, 2013, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation. Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package sun.rmi.transport.tcp;
-
-import java.io.*;
-
-/**
- * MultiplexOutputStream manages sending data over a connection managed
- * by a ConnectionMultiplexer object. Data written is buffered until the
- * internal buffer is full or the flush() method is called, at which
- * point it attempts to push a packet of bytes through to the remote
- * endpoint. This will never push more bytes than the amount already
- * requested by the remote endpoint (to prevent receive buffer from
- * overflowing), so if the write() and flush() methods will block
- * until their operation can complete if enough bytes cannot be
- * pushed immediately.
- *
- * @author Peter Jones
- */
-final class MultiplexOutputStream extends OutputStream {
-
- /** object managing multiplexed connection */
- private ConnectionMultiplexer manager;
-
- /** information about the connection this is the output stream for */
- private MultiplexConnectionInfo info;
-
- /** output buffer */
- private byte buffer[];
-
- /** current position to write to in output buffer */
- private int pos = 0;
-
- /** pending number of bytes requested by remote endpoint */
- private int requested = 0;
-
- /** true if this connection has been disconnected */
- private boolean disconnected = false;
-
- /**
- * lock acquired to access shared variables:
- * requested & disconnected
- * WARNING: Any of the methods manager.send*() should not be
- * invoked while this lock is held, since they could potentially
- * block if the underlying connection's transport buffers are
- * full, and the manager may need to acquire this lock to process
- * and consume data coming over the underlying connection.
- */
- private Object lock = new Object();
-
- /**
- * Create a new MultiplexOutputStream for the given manager.
- * @param manager object that manages this connection
- * @param info structure for connection this stream writes to
- * @param bufferLength length of output buffer
- */
- MultiplexOutputStream(
- ConnectionMultiplexer manager,
- MultiplexConnectionInfo info,
- int bufferLength)
- {
- this.manager = manager;
- this.info = info;
-
- buffer = new byte[bufferLength];
- pos = 0;
- }
-
- /**
- * Write a byte over connection.
- * @param b byte of data to write
- */
- public synchronized void write(int b) throws IOException
- {
- while (pos >= buffer.length)
- push();
- buffer[pos ++] = (byte) b;
- }
-
- /**
- * Write a subarray of bytes over connection.
- * @param b array containing bytes to write
- * @param off offset of beginning of bytes to write
- * @param len number of bytes to write
- */
- public synchronized void write(byte b[], int off, int len)
- throws IOException
- {
- if (len <= 0)
- return;
-
- // if enough free space in output buffer, just copy into there
- int freeSpace = buffer.length - pos;
- if (len <= freeSpace) {
- System.arraycopy(b, off, buffer, pos, len);
- pos += len;
- return;
- }
-
- // else, flush buffer and send rest directly to avoid array copy
- flush();
- int local_requested;
- while (true) {
- synchronized (lock) {
- while ((local_requested = requested) < 1 && !disconnected) {
- try {
- lock.wait();
- } catch (InterruptedException e) {
- }
- }
- if (disconnected)
- throw new IOException("Connection closed");
- }
-
- if (local_requested < len) {
- manager.sendTransmit(info, b, off, local_requested);
- off += local_requested;
- len -= local_requested;
- synchronized (lock) {
- requested -= local_requested;
- }
- }
- else {
- manager.sendTransmit(info, b, off, len);
- synchronized (lock) {
- requested -= len;
- }
- // len = 0;
- break;
- }
- }
- }
-
- /**
- * Guarantee that all data written to this stream has been pushed
- * over and made available to the remote endpoint.
- */
- public synchronized void flush() throws IOException {
- while (pos > 0)
- push();
- }
-
- /**
- * Close this connection.
- */
- public void close() throws IOException
- {
- manager.sendClose(info);
- }
-
- /**
- * Take note of more bytes requested by connection at remote endpoint.
- * @param num number of additional bytes requested
- */
- void request(int num)
- {
- synchronized (lock) {
- requested += num;
- lock.notifyAll();
- }
- }
-
- /**
- * Disconnect this stream from all connection activity.
- */
- void disconnect()
- {
- synchronized (lock) {
- disconnected = true;
- lock.notifyAll();
- }
- }
-
- /**
- * Push bytes in output buffer to connection at remote endpoint.
- * This method blocks until at least one byte has been pushed across.
- */
- private void push() throws IOException
- {
- int local_requested;
- synchronized (lock) {
- while ((local_requested = requested) < 1 && !disconnected) {
- try {
- lock.wait();
- } catch (InterruptedException e) {
- }
- }
- if (disconnected)
- throw new IOException("Connection closed");
- }
-
- if (local_requested < pos) {
- manager.sendTransmit(info, buffer, 0, local_requested);
- System.arraycopy(buffer, local_requested,
- buffer, 0, pos - local_requested);
- pos -= local_requested;
- synchronized (lock) {
- requested -= local_requested;
- }
- }
- else {
- manager.sendTransmit(info, buffer, 0, pos);
- synchronized (lock) {
- requested -= pos;
- }
- pos = 0;
- }
- }
-}
--- a/jdk/src/java.rmi/share/classes/sun/rmi/transport/tcp/TCPChannel.java Thu Aug 31 12:48:19 2017 -0700
+++ b/jdk/src/java.rmi/share/classes/sun/rmi/transport/tcp/TCPChannel.java Thu Aug 31 17:08:35 2017 -0400
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 1996, 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 1996, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -67,10 +67,6 @@
/** frees cached connections that have expired (guarded by freeList) */
private Future<?> reaper = null;
- /** using multiplexer (for bi-directional applet communication */
- private boolean usingMultiplexer = false;
- /** connection multiplexer, if used */
- private ConnectionMultiplexer multiplexer = null;
/** connection acceptor (should be in TCPTransport) */
private ConnectionAcceptor acceptor;
@@ -210,113 +206,99 @@
TCPTransport.tcpLog.log(Log.BRIEF, "create connection");
- if (!usingMultiplexer) {
- Socket sock = ep.newSocket();
- conn = new TCPConnection(this, sock);
+ Socket sock = ep.newSocket();
+ conn = new TCPConnection(this, sock);
- try {
- DataOutputStream out =
- new DataOutputStream(conn.getOutputStream());
- writeTransportHeader(out);
+ try {
+ DataOutputStream out =
+ new DataOutputStream(conn.getOutputStream());
+ writeTransportHeader(out);
- // choose protocol (single op if not reusable socket)
- if (!conn.isReusable()) {
- out.writeByte(TransportConstants.SingleOpProtocol);
- } else {
- out.writeByte(TransportConstants.StreamProtocol);
- out.flush();
+ // choose protocol (single op if not reusable socket)
+ if (!conn.isReusable()) {
+ out.writeByte(TransportConstants.SingleOpProtocol);
+ } else {
+ out.writeByte(TransportConstants.StreamProtocol);
+ out.flush();
- /*
- * Set socket read timeout to configured value for JRMP
- * connection handshake; this also serves to guard against
- * non-JRMP servers that do not respond (see 4322806).
- */
- int originalSoTimeout = 0;
- try {
- originalSoTimeout = sock.getSoTimeout();
- sock.setSoTimeout(handshakeTimeout);
- } catch (Exception e) {
- // if we fail to set this, ignore and proceed anyway
- }
+ /*
+ * Set socket read timeout to configured value for JRMP
+ * connection handshake; this also serves to guard against
+ * non-JRMP servers that do not respond (see 4322806).
+ */
+ int originalSoTimeout = 0;
+ try {
+ originalSoTimeout = sock.getSoTimeout();
+ sock.setSoTimeout(handshakeTimeout);
+ } catch (Exception e) {
+ // if we fail to set this, ignore and proceed anyway
+ }
- DataInputStream in =
- new DataInputStream(conn.getInputStream());
- byte ack = in.readByte();
- if (ack != TransportConstants.ProtocolAck) {
- throw new ConnectIOException(
- ack == TransportConstants.ProtocolNack ?
- "JRMP StreamProtocol not supported by server" :
- "non-JRMP server at remote endpoint");
- }
+ DataInputStream in =
+ new DataInputStream(conn.getInputStream());
+ byte ack = in.readByte();
+ if (ack != TransportConstants.ProtocolAck) {
+ throw new ConnectIOException(
+ ack == TransportConstants.ProtocolNack ?
+ "JRMP StreamProtocol not supported by server" :
+ "non-JRMP server at remote endpoint");
+ }
- String suggestedHost = in.readUTF();
- int suggestedPort = in.readInt();
- if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) {
- TCPTransport.tcpLog.log(Log.VERBOSE,
- "server suggested " + suggestedHost + ":" +
- suggestedPort);
- }
-
- // set local host name, if unknown
- TCPEndpoint.setLocalHost(suggestedHost);
- // do NOT set the default port, because we don't
- // know if we can't listen YET...
+ String suggestedHost = in.readUTF();
+ int suggestedPort = in.readInt();
+ if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) {
+ TCPTransport.tcpLog.log(Log.VERBOSE,
+ "server suggested " + suggestedHost + ":" +
+ suggestedPort);
+ }
- // write out default endpoint to match protocol
- // (but it serves no purpose)
- TCPEndpoint localEp =
- TCPEndpoint.getLocalEndpoint(0, null, null);
- out.writeUTF(localEp.getHost());
- out.writeInt(localEp.getPort());
- if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) {
- TCPTransport.tcpLog.log(Log.VERBOSE, "using " +
- localEp.getHost() + ":" + localEp.getPort());
- }
+ // set local host name, if unknown
+ TCPEndpoint.setLocalHost(suggestedHost);
+ // do NOT set the default port, because we don't
+ // know if we can't listen YET...
+ // write out default endpoint to match protocol
+ // (but it serves no purpose)
+ TCPEndpoint localEp =
+ TCPEndpoint.getLocalEndpoint(0, null, null);
+ out.writeUTF(localEp.getHost());
+ out.writeInt(localEp.getPort());
+ if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) {
+ TCPTransport.tcpLog.log(Log.VERBOSE, "using " +
+ localEp.getHost() + ":" + localEp.getPort());
+ }
+
+ /*
+ * After JRMP handshake, set socket read timeout to value
+ * configured for the rest of the lifetime of the
+ * connection. NOTE: this timeout, if configured to a
+ * finite duration, places an upper bound on the time
+ * that a remote method call is permitted to execute.
+ */
+ try {
/*
- * After JRMP handshake, set socket read timeout to value
- * configured for the rest of the lifetime of the
- * connection. NOTE: this timeout, if configured to a
- * finite duration, places an upper bound on the time
- * that a remote method call is permitted to execute.
+ * If socket factory had set a non-zero timeout on its
+ * own, then restore it instead of using the property-
+ * configured value.
*/
- try {
- /*
- * If socket factory had set a non-zero timeout on its
- * own, then restore it instead of using the property-
- * configured value.
- */
- sock.setSoTimeout((originalSoTimeout != 0 ?
- originalSoTimeout :
- responseTimeout));
- } catch (Exception e) {
- // if we fail to set this, ignore and proceed anyway
- }
-
- out.flush();
+ sock.setSoTimeout((originalSoTimeout != 0 ?
+ originalSoTimeout :
+ responseTimeout));
+ } catch (Exception e) {
+ // if we fail to set this, ignore and proceed anyway
}
- } catch (IOException e) {
- try {
- conn.close();
- } catch (Exception ex) {}
- if (e instanceof RemoteException) {
- throw (RemoteException) e;
- } else {
- throw new ConnectIOException(
- "error during JRMP connection establishment", e);
- }
+
+ out.flush();
}
- } else {
+ } catch (IOException e) {
try {
- conn = multiplexer.openConnection();
- } catch (IOException e) {
- synchronized (this) {
- usingMultiplexer = false;
- multiplexer = null;
- }
+ conn.close();
+ } catch (Exception ex) {}
+ if (e instanceof RemoteException) {
+ throw (RemoteException) e;
+ } else {
throw new ConnectIOException(
- "error opening virtual connection " +
- "over multiplexed connection", e);
+ "error during JRMP connection establishment", e);
}
}
return conn;
@@ -388,28 +370,6 @@
}
/**
- * Use given connection multiplexer object to obtain new connections
- * through this channel.
- */
- synchronized void useMultiplexer(ConnectionMultiplexer newMultiplexer) {
- // for now, always just use the last one given
- multiplexer = newMultiplexer;
-
- usingMultiplexer = true;
- }
-
- /**
- * Accept a connection provided over a multiplexed channel.
- */
- void acceptMultiplexConnection(Connection conn) {
- if (acceptor == null) {
- acceptor = new ConnectionAcceptor(tr);
- acceptor.startNewAcceptor();
- }
- acceptor.accept(conn);
- }
-
- /**
* Closes all the connections in the cache, whether timed out or not.
*/
public void shedCache() {
@@ -501,7 +461,7 @@
public void startNewAcceptor() {
Thread t = AccessController.doPrivileged(
new NewThreadAction(ConnectionAcceptor.this,
- "Multiplex Accept-" + ++ threadNum,
+ "TCPChannel Accept-" + ++ threadNum,
true));
t.start();
}
--- a/jdk/src/java.rmi/share/classes/sun/rmi/transport/tcp/TCPTransport.java Thu Aug 31 12:48:19 2017 -0700
+++ b/jdk/src/java.rmi/share/classes/sun/rmi/transport/tcp/TCPTransport.java Thu Aug 31 17:08:35 2017 -0400
@@ -102,11 +102,6 @@
AccessController.doPrivileged((PrivilegedAction<Long>) () ->
Long.getLong("sun.rmi.transport.tcp.threadKeepAliveTime", 60000));
- /** enable multiplexing protocol */
- private static final boolean enableMultiplexProtocol = // default false
- AccessController.doPrivileged((PrivilegedAction<Boolean>) () ->
- Boolean.getBoolean("sun.rmi.transport.tcp.enableMultiplexProtocol"));
-
/** thread pool for connection handlers */
private static final ExecutorService connectionThreadPool =
new ThreadPoolExecutor(0, maxConnectionThreads,
@@ -687,6 +682,7 @@
}
}
+ @SuppressWarnings("fallthrough")
private void run0() {
TCPEndpoint endpoint = getEndpoint();
int port = endpoint.getPort();
@@ -801,59 +797,11 @@
break;
case TransportConstants.MultiplexProtocol:
-
- if (!enableMultiplexProtocol) {
- if (tcpLog.isLoggable(Log.VERBOSE)) {
- tcpLog.log(Log.VERBOSE, "(port " + port +
- ") rejecting multiplex protocol");
- }
-
- // If MultiplexProtocol is disabled, send NACK immediately.
- out.writeByte(TransportConstants.ProtocolNack);
- out.flush();
- break;
- }
-
- if (tcpLog.isLoggable(Log.VERBOSE)) {
- tcpLog.log(Log.VERBOSE, "(port " + port +
- ") accepting multiplex protocol");
- }
-
- // send ack
- out.writeByte(TransportConstants.ProtocolAck);
-
- // suggest endpoint (in case client doesn't already have one)
if (tcpLog.isLoggable(Log.VERBOSE)) {
tcpLog.log(Log.VERBOSE, "(port " + port +
- ") suggesting " + remoteHost + ":" + remotePort);
+ ") rejecting multiplex protocol");
}
-
- out.writeUTF(remoteHost);
- out.writeInt(remotePort);
- out.flush();
-
- // read endpoint client has decided to use
- ep = new TCPEndpoint(in.readUTF(), in.readInt(),
- endpoint.getClientSocketFactory(),
- endpoint.getServerSocketFactory());
- if (tcpLog.isLoggable(Log.VERBOSE)) {
- tcpLog.log(Log.VERBOSE, "(port " +
- port + ") client using " +
- ep.getHost() + ":" + ep.getPort());
- }
-
- ConnectionMultiplexer multiplexer;
- synchronized (channelTable) {
- // create or find channel for this endpoint
- ch = getChannel(ep);
- multiplexer =
- new ConnectionMultiplexer(ch, bufIn, sockOut,
- false);
- ch.useMultiplexer(multiplexer);
- }
- multiplexer.run();
- break;
-
+ // Fall-through to reject use of MultiplexProtocol
default:
// protocol not understood, send nack and close socket
out.writeByte(TransportConstants.ProtocolNack);