jdk/src/java.rmi/share/classes/sun/rmi/transport/tcp/MultiplexInputStream.java
changeset 47199 5b4ba31ce49b
parent 47198 898607275d6e
parent 47125 46ab150d59cd
child 47200 618e6ae80417
equal deleted inserted replaced
47198:898607275d6e 47199:5b4ba31ce49b
     1 /*
       
     2  * Copyright (c) 1996, 1997, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 package sun.rmi.transport.tcp;
       
    26 
       
    27 import java.io.*;
       
    28 
       
    29 /**
       
    30  * MultiplexInputStream manages receiving data over a connection managed
       
    31  * by a ConnectionMultiplexer object.  This object is responsible for
       
    32  * requesting more bytes of data as space in its internal buffer becomes
       
    33  * available.
       
    34  *
       
    35  * @author Peter Jones
       
    36  */
       
    37 final class MultiplexInputStream extends InputStream {
       
    38 
       
    39     /** object managing multiplexed connection */
       
    40     private ConnectionMultiplexer manager;
       
    41 
       
    42     /** information about the connection this is the input stream for */
       
    43     private MultiplexConnectionInfo info;
       
    44 
       
    45     /** input buffer */
       
    46     private byte buffer[];
       
    47 
       
    48     /** number of real data bytes present in buffer */
       
    49     private int present = 0;
       
    50 
       
    51     /** current position to read from in input buffer */
       
    52     private int pos = 0;
       
    53 
       
    54     /** pending number of bytes this stream has requested */
       
    55     private int requested = 0;
       
    56 
       
    57     /** true if this connection has been disconnected */
       
    58     private boolean disconnected = false;
       
    59 
       
    60     /**
       
    61      * lock acquired to access shared variables:
       
    62      * buffer, present, pos, requested, & disconnected
       
    63      * WARNING:  Any of the methods manager.send*() should not be
       
    64      * invoked while this lock is held, since they could potentially
       
    65      * block if the underlying connection's transport buffers are
       
    66      * full, and the manager may need to acquire this lock to process
       
    67      * and consume data coming over the underlying connection.
       
    68      */
       
    69     private Object lock = new Object();
       
    70 
       
    71     /** level at which more data is requested when read past */
       
    72     private int waterMark;
       
    73 
       
    74     /** data structure for holding reads of one byte */
       
    75     private byte temp[] = new byte[1];
       
    76 
       
    77     /**
       
    78      * Create a new MultiplexInputStream for the given manager.
       
    79      * @param manager object that manages this connection
       
    80      * @param info structure for connection this stream reads from
       
    81      * @param bufferLength length of input buffer
       
    82      */
       
    83     MultiplexInputStream(
       
    84         ConnectionMultiplexer    manager,
       
    85         MultiplexConnectionInfo  info,
       
    86         int                      bufferLength)
       
    87     {
       
    88         this.manager = manager;
       
    89         this.info    = info;
       
    90 
       
    91         buffer = new byte[bufferLength];
       
    92         waterMark = bufferLength / 2;
       
    93     }
       
    94 
       
    95     /**
       
    96      * Read a byte from the connection.
       
    97      */
       
    98     public synchronized int read() throws IOException
       
    99     {
       
   100         int n = read(temp, 0, 1);
       
   101         if (n != 1)
       
   102             return -1;
       
   103         return temp[0] & 0xFF;
       
   104     }
       
   105 
       
   106     /**
       
   107      * Read a subarray of bytes from connection.  This method blocks for
       
   108      * at least one byte, and it returns the number of bytes actually read,
       
   109      * or -1 if the end of the stream was detected.
       
   110      * @param b array to read bytes into
       
   111      * @param off offset of beginning of bytes to read into
       
   112      * @param len number of bytes to read
       
   113      */
       
   114     public synchronized int read(byte b[], int off, int len) throws IOException
       
   115     {
       
   116         if (len <= 0)
       
   117             return 0;
       
   118 
       
   119         int moreSpace;
       
   120         synchronized (lock) {
       
   121             if (pos >= present)
       
   122                 pos = present = 0;
       
   123             else if (pos >= waterMark) {
       
   124                 System.arraycopy(buffer, pos, buffer, 0, present - pos);
       
   125                 present -= pos;
       
   126                 pos = 0;
       
   127             }
       
   128             int freeSpace = buffer.length - present;
       
   129             moreSpace = Math.max(freeSpace - requested, 0);
       
   130         }
       
   131         if (moreSpace > 0)
       
   132             manager.sendRequest(info, moreSpace);
       
   133         synchronized (lock) {
       
   134             requested += moreSpace;
       
   135             while ((pos >= present) && !disconnected) {
       
   136                 try {
       
   137                     lock.wait();
       
   138                 } catch (InterruptedException e) {
       
   139                 }
       
   140             }
       
   141             if (disconnected && pos >= present)
       
   142                 return -1;
       
   143 
       
   144             int available = present - pos;
       
   145             if (len < available) {
       
   146                 System.arraycopy(buffer, pos, b, off, len);
       
   147                 pos += len;
       
   148                 return len;
       
   149             }
       
   150             else {
       
   151                 System.arraycopy(buffer, pos, b, off, available);
       
   152                 pos = present = 0;
       
   153                 // could send another request here, if len > available??
       
   154                 return available;
       
   155             }
       
   156         }
       
   157     }
       
   158 
       
   159     /**
       
   160      * Return the number of bytes immediately available for reading.
       
   161      */
       
   162     public int available() throws IOException
       
   163     {
       
   164         synchronized (lock) {
       
   165             return present - pos;
       
   166         }
       
   167     }
       
   168 
       
   169     /**
       
   170      * Close this connection.
       
   171      */
       
   172     public void close() throws IOException
       
   173     {
       
   174         manager.sendClose(info);
       
   175     }
       
   176 
       
   177     /**
       
   178      * Receive bytes transmitted from connection at remote endpoint.
       
   179      * @param length number of bytes transmitted
       
   180      * @param in input stream with those bytes ready to be read
       
   181      */
       
   182     void receive(int length, DataInputStream in)
       
   183         throws IOException
       
   184     {
       
   185         /* TO DO: Optimize so that data received from stream can be loaded
       
   186          * directly into user's buffer if there is a pending read().
       
   187          */
       
   188         synchronized (lock) {
       
   189             if ((pos > 0) && ((buffer.length - present) < length)) {
       
   190                 System.arraycopy(buffer, pos, buffer, 0, present - pos);
       
   191                 present -= pos;
       
   192                 pos = 0;
       
   193             }
       
   194             if ((buffer.length - present) < length)
       
   195                 throw new IOException("Receive buffer overflow");
       
   196             in.readFully(buffer, present, length);
       
   197             present += length;
       
   198             requested -= length;
       
   199             lock.notifyAll();
       
   200         }
       
   201     }
       
   202 
       
   203     /**
       
   204      * Disconnect this stream from all connection activity.
       
   205      */
       
   206     void disconnect()
       
   207     {
       
   208         synchronized (lock) {
       
   209             disconnected = true;
       
   210             lock.notifyAll();
       
   211         }
       
   212     }
       
   213 }