jdk/src/share/classes/sun/rmi/transport/tcp/MultiplexInputStream.java
author xdono
Wed, 02 Jul 2008 12:55:45 -0700
changeset 715 f16baef3a20e
parent 2 90ce3da70b43
child 5506 202f599c92aa
permissions -rw-r--r--
6719955: Update copyright year Summary: Update copyright year for files that have been modified in 2008 Reviewed-by: ohair, tbell
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
     1
/*
90ce3da70b43 Initial load
duke
parents:
diff changeset
     2
 * Copyright 1996-1997 Sun Microsystems, Inc.  All Rights Reserved.
90ce3da70b43 Initial load
duke
parents:
diff changeset
     3
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
90ce3da70b43 Initial load
duke
parents:
diff changeset
     4
 *
90ce3da70b43 Initial load
duke
parents:
diff changeset
     5
 * This code is free software; you can redistribute it and/or modify it
90ce3da70b43 Initial load
duke
parents:
diff changeset
     6
 * under the terms of the GNU General Public License version 2 only, as
90ce3da70b43 Initial load
duke
parents:
diff changeset
     7
 * published by the Free Software Foundation.  Sun designates this
90ce3da70b43 Initial load
duke
parents:
diff changeset
     8
 * particular file as subject to the "Classpath" exception as provided
90ce3da70b43 Initial load
duke
parents:
diff changeset
     9
 * by Sun in the LICENSE file that accompanied this code.
90ce3da70b43 Initial load
duke
parents:
diff changeset
    10
 *
90ce3da70b43 Initial load
duke
parents:
diff changeset
    11
 * This code is distributed in the hope that it will be useful, but WITHOUT
90ce3da70b43 Initial load
duke
parents:
diff changeset
    12
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
90ce3da70b43 Initial load
duke
parents:
diff changeset
    13
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
90ce3da70b43 Initial load
duke
parents:
diff changeset
    14
 * version 2 for more details (a copy is included in the LICENSE file that
90ce3da70b43 Initial load
duke
parents:
diff changeset
    15
 * accompanied this code).
90ce3da70b43 Initial load
duke
parents:
diff changeset
    16
 *
90ce3da70b43 Initial load
duke
parents:
diff changeset
    17
 * You should have received a copy of the GNU General Public License version
90ce3da70b43 Initial load
duke
parents:
diff changeset
    18
 * 2 along with this work; if not, write to the Free Software Foundation,
90ce3da70b43 Initial load
duke
parents:
diff changeset
    19
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
90ce3da70b43 Initial load
duke
parents:
diff changeset
    20
 *
90ce3da70b43 Initial load
duke
parents:
diff changeset
    21
 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
90ce3da70b43 Initial load
duke
parents:
diff changeset
    22
 * CA 95054 USA or visit www.sun.com if you need additional information or
90ce3da70b43 Initial load
duke
parents:
diff changeset
    23
 * have any questions.
90ce3da70b43 Initial load
duke
parents:
diff changeset
    24
 */
90ce3da70b43 Initial load
duke
parents:
diff changeset
    25
package sun.rmi.transport.tcp;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    26
90ce3da70b43 Initial load
duke
parents:
diff changeset
    27
import java.io.*;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    28
90ce3da70b43 Initial load
duke
parents:
diff changeset
    29
/**
90ce3da70b43 Initial load
duke
parents:
diff changeset
    30
 * MultiplexInputStream manages receiving data over a connection managed
90ce3da70b43 Initial load
duke
parents:
diff changeset
    31
 * by a ConnectionMultiplexer object.  This object is responsible for
90ce3da70b43 Initial load
duke
parents:
diff changeset
    32
 * requesting more bytes of data as space in its internal buffer becomes
90ce3da70b43 Initial load
duke
parents:
diff changeset
    33
 * available.
90ce3da70b43 Initial load
duke
parents:
diff changeset
    34
 *
90ce3da70b43 Initial load
duke
parents:
diff changeset
    35
 * @author Peter Jones
90ce3da70b43 Initial load
duke
parents:
diff changeset
    36
 */
90ce3da70b43 Initial load
duke
parents:
diff changeset
    37
final class MultiplexInputStream extends InputStream {
90ce3da70b43 Initial load
duke
parents:
diff changeset
    38
90ce3da70b43 Initial load
duke
parents:
diff changeset
    39
    /** object managing multiplexed connection */
90ce3da70b43 Initial load
duke
parents:
diff changeset
    40
    private ConnectionMultiplexer manager;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    41
90ce3da70b43 Initial load
duke
parents:
diff changeset
    42
    /** information about the connection this is the input stream for */
90ce3da70b43 Initial load
duke
parents:
diff changeset
    43
    private MultiplexConnectionInfo info;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    44
90ce3da70b43 Initial load
duke
parents:
diff changeset
    45
    /** input buffer */
90ce3da70b43 Initial load
duke
parents:
diff changeset
    46
    private byte buffer[];
90ce3da70b43 Initial load
duke
parents:
diff changeset
    47
90ce3da70b43 Initial load
duke
parents:
diff changeset
    48
    /** number of real data bytes present in buffer */
90ce3da70b43 Initial load
duke
parents:
diff changeset
    49
    private int present = 0;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    50
90ce3da70b43 Initial load
duke
parents:
diff changeset
    51
    /** current position to read from in input buffer */
90ce3da70b43 Initial load
duke
parents:
diff changeset
    52
    private int pos = 0;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    53
90ce3da70b43 Initial load
duke
parents:
diff changeset
    54
    /** pending number of bytes this stream has requested */
90ce3da70b43 Initial load
duke
parents:
diff changeset
    55
    private int requested = 0;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    56
90ce3da70b43 Initial load
duke
parents:
diff changeset
    57
    /** true if this connection has been disconnected */
90ce3da70b43 Initial load
duke
parents:
diff changeset
    58
    private boolean disconnected = false;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    59
90ce3da70b43 Initial load
duke
parents:
diff changeset
    60
    /**
90ce3da70b43 Initial load
duke
parents:
diff changeset
    61
     * lock acquired to access shared variables:
90ce3da70b43 Initial load
duke
parents:
diff changeset
    62
     * buffer, present, pos, requested, & disconnected
90ce3da70b43 Initial load
duke
parents:
diff changeset
    63
     * WARNING:  Any of the methods manager.send*() should not be
90ce3da70b43 Initial load
duke
parents:
diff changeset
    64
     * invoked while this lock is held, since they could potentially
90ce3da70b43 Initial load
duke
parents:
diff changeset
    65
     * block if the underlying connection's transport buffers are
90ce3da70b43 Initial load
duke
parents:
diff changeset
    66
     * full, and the manager may need to acquire this lock to process
90ce3da70b43 Initial load
duke
parents:
diff changeset
    67
     * and consume data coming over the underlying connection.
90ce3da70b43 Initial load
duke
parents:
diff changeset
    68
     */
90ce3da70b43 Initial load
duke
parents:
diff changeset
    69
    private Object lock = new Object();
90ce3da70b43 Initial load
duke
parents:
diff changeset
    70
90ce3da70b43 Initial load
duke
parents:
diff changeset
    71
    /** level at which more data is requested when read past */
90ce3da70b43 Initial load
duke
parents:
diff changeset
    72
    private int waterMark;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    73
90ce3da70b43 Initial load
duke
parents:
diff changeset
    74
    /** data structure for holding reads of one byte */
90ce3da70b43 Initial load
duke
parents:
diff changeset
    75
    private byte temp[] = new byte[1];
90ce3da70b43 Initial load
duke
parents:
diff changeset
    76
90ce3da70b43 Initial load
duke
parents:
diff changeset
    77
    /**
90ce3da70b43 Initial load
duke
parents:
diff changeset
    78
     * Create a new MultiplexInputStream for the given manager.
90ce3da70b43 Initial load
duke
parents:
diff changeset
    79
     * @param manager object that manages this connection
90ce3da70b43 Initial load
duke
parents:
diff changeset
    80
     * @param info structure for connection this stream reads from
90ce3da70b43 Initial load
duke
parents:
diff changeset
    81
     * @param bufferLength length of input buffer
90ce3da70b43 Initial load
duke
parents:
diff changeset
    82
     */
90ce3da70b43 Initial load
duke
parents:
diff changeset
    83
    MultiplexInputStream(
90ce3da70b43 Initial load
duke
parents:
diff changeset
    84
        ConnectionMultiplexer    manager,
90ce3da70b43 Initial load
duke
parents:
diff changeset
    85
        MultiplexConnectionInfo  info,
90ce3da70b43 Initial load
duke
parents:
diff changeset
    86
        int                      bufferLength)
90ce3da70b43 Initial load
duke
parents:
diff changeset
    87
    {
90ce3da70b43 Initial load
duke
parents:
diff changeset
    88
        this.manager = manager;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    89
        this.info    = info;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    90
90ce3da70b43 Initial load
duke
parents:
diff changeset
    91
        buffer = new byte[bufferLength];
90ce3da70b43 Initial load
duke
parents:
diff changeset
    92
        waterMark = bufferLength / 2;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    93
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
    94
90ce3da70b43 Initial load
duke
parents:
diff changeset
    95
    /**
90ce3da70b43 Initial load
duke
parents:
diff changeset
    96
     * Read a byte from the connection.
90ce3da70b43 Initial load
duke
parents:
diff changeset
    97
     */
90ce3da70b43 Initial load
duke
parents:
diff changeset
    98
    public synchronized int read() throws IOException
90ce3da70b43 Initial load
duke
parents:
diff changeset
    99
    {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   100
        int n = read(temp, 0, 1);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   101
        if (n != 1)
90ce3da70b43 Initial load
duke
parents:
diff changeset
   102
            return -1;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   103
        return temp[0] & 0xFF;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   104
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   105
90ce3da70b43 Initial load
duke
parents:
diff changeset
   106
    /**
90ce3da70b43 Initial load
duke
parents:
diff changeset
   107
     * Read a subarray of bytes from connection.  This method blocks for
90ce3da70b43 Initial load
duke
parents:
diff changeset
   108
     * at least one byte, and it returns the number of bytes actually read,
90ce3da70b43 Initial load
duke
parents:
diff changeset
   109
     * or -1 if the end of the stream was detected.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   110
     * @param b array to read bytes into
90ce3da70b43 Initial load
duke
parents:
diff changeset
   111
     * @param off offset of beginning of bytes to read into
90ce3da70b43 Initial load
duke
parents:
diff changeset
   112
     * @param len number of bytes to read
90ce3da70b43 Initial load
duke
parents:
diff changeset
   113
     */
90ce3da70b43 Initial load
duke
parents:
diff changeset
   114
    public synchronized int read(byte b[], int off, int len) throws IOException
90ce3da70b43 Initial load
duke
parents:
diff changeset
   115
    {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   116
        if (len <= 0)
90ce3da70b43 Initial load
duke
parents:
diff changeset
   117
            return 0;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   118
90ce3da70b43 Initial load
duke
parents:
diff changeset
   119
        int moreSpace;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   120
        synchronized (lock) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   121
            if (pos >= present)
90ce3da70b43 Initial load
duke
parents:
diff changeset
   122
                pos = present = 0;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   123
            else if (pos >= waterMark) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   124
                System.arraycopy(buffer, pos, buffer, 0, present - pos);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   125
                present -= pos;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   126
                pos = 0;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   127
            }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   128
            int freeSpace = buffer.length - present;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   129
            moreSpace = Math.max(freeSpace - requested, 0);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   130
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   131
        if (moreSpace > 0)
90ce3da70b43 Initial load
duke
parents:
diff changeset
   132
            manager.sendRequest(info, moreSpace);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   133
        synchronized (lock) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   134
            requested += moreSpace;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   135
            while ((pos >= present) && !disconnected) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   136
                try {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   137
                    lock.wait();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   138
                } catch (InterruptedException e) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   139
                }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   140
            }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   141
            if (disconnected && pos >= present)
90ce3da70b43 Initial load
duke
parents:
diff changeset
   142
                return -1;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   143
90ce3da70b43 Initial load
duke
parents:
diff changeset
   144
            int available = present - pos;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   145
            if (len < available) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   146
                System.arraycopy(buffer, pos, b, off, len);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   147
                pos += len;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   148
                return len;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   149
            }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   150
            else {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   151
                System.arraycopy(buffer, pos, b, off, available);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   152
                pos = present = 0;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   153
                // could send another request here, if len > available??
90ce3da70b43 Initial load
duke
parents:
diff changeset
   154
                return available;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   155
            }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   156
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   157
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   158
90ce3da70b43 Initial load
duke
parents:
diff changeset
   159
    /**
90ce3da70b43 Initial load
duke
parents:
diff changeset
   160
     * Return the number of bytes immediately available for reading.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   161
     */
90ce3da70b43 Initial load
duke
parents:
diff changeset
   162
    public int available() throws IOException
90ce3da70b43 Initial load
duke
parents:
diff changeset
   163
    {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   164
        synchronized (lock) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   165
            return present - pos;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   166
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   167
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   168
90ce3da70b43 Initial load
duke
parents:
diff changeset
   169
    /**
90ce3da70b43 Initial load
duke
parents:
diff changeset
   170
     * Close this connection.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   171
     */
90ce3da70b43 Initial load
duke
parents:
diff changeset
   172
    public void close() throws IOException
90ce3da70b43 Initial load
duke
parents:
diff changeset
   173
    {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   174
        manager.sendClose(info);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   175
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   176
90ce3da70b43 Initial load
duke
parents:
diff changeset
   177
    /**
90ce3da70b43 Initial load
duke
parents:
diff changeset
   178
     * Receive bytes transmitted from connection at remote endpoint.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   179
     * @param length number of bytes transmitted
90ce3da70b43 Initial load
duke
parents:
diff changeset
   180
     * @param in input stream with those bytes ready to be read
90ce3da70b43 Initial load
duke
parents:
diff changeset
   181
     */
90ce3da70b43 Initial load
duke
parents:
diff changeset
   182
    void receive(int length, DataInputStream in)
90ce3da70b43 Initial load
duke
parents:
diff changeset
   183
        throws IOException
90ce3da70b43 Initial load
duke
parents:
diff changeset
   184
    {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   185
        /* TO DO: Optimize so that data received from stream can be loaded
90ce3da70b43 Initial load
duke
parents:
diff changeset
   186
         * directly into user's buffer if there is a pending read().
90ce3da70b43 Initial load
duke
parents:
diff changeset
   187
         */
90ce3da70b43 Initial load
duke
parents:
diff changeset
   188
        synchronized (lock) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   189
            if ((pos > 0) && ((buffer.length - present) < length)) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   190
                System.arraycopy(buffer, pos, buffer, 0, present - pos);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   191
                present -= pos;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   192
                pos = 0;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   193
            }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   194
            if ((buffer.length - present) < length)
90ce3da70b43 Initial load
duke
parents:
diff changeset
   195
                throw new IOException("Receive buffer overflow");
90ce3da70b43 Initial load
duke
parents:
diff changeset
   196
            in.readFully(buffer, present, length);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   197
            present += length;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   198
            requested -= length;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   199
            lock.notifyAll();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   200
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   201
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   202
90ce3da70b43 Initial load
duke
parents:
diff changeset
   203
    /**
90ce3da70b43 Initial load
duke
parents:
diff changeset
   204
     * Disconnect this stream from all connection activity.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   205
     */
90ce3da70b43 Initial load
duke
parents:
diff changeset
   206
    void disconnect()
90ce3da70b43 Initial load
duke
parents:
diff changeset
   207
    {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   208
        synchronized (lock) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   209
            disconnected = true;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   210
            lock.notifyAll();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   211
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   212
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   213
}