1 /* |
|
2 * Copyright (c) 1996, 1997, Oracle and/or its affiliates. All rights reserved. |
|
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
|
4 * |
|
5 * This code is free software; you can redistribute it and/or modify it |
|
6 * under the terms of the GNU General Public License version 2 only, as |
|
7 * published by the Free Software Foundation. Oracle designates this |
|
8 * particular file as subject to the "Classpath" exception as provided |
|
9 * by Oracle in the LICENSE file that accompanied this code. |
|
10 * |
|
11 * This code is distributed in the hope that it will be useful, but WITHOUT |
|
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
|
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
|
14 * version 2 for more details (a copy is included in the LICENSE file that |
|
15 * accompanied this code). |
|
16 * |
|
17 * You should have received a copy of the GNU General Public License version |
|
18 * 2 along with this work; if not, write to the Free Software Foundation, |
|
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
|
20 * |
|
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
|
22 * or visit www.oracle.com if you need additional information or have any |
|
23 * questions. |
|
24 */ |
|
25 package sun.rmi.transport.tcp; |
|
26 |
|
27 import java.io.*; |
|
28 |
|
29 /** |
|
30 * MultiplexInputStream manages receiving data over a connection managed |
|
31 * by a ConnectionMultiplexer object. This object is responsible for |
|
32 * requesting more bytes of data as space in its internal buffer becomes |
|
33 * available. |
|
34 * |
|
35 * @author Peter Jones |
|
36 */ |
|
37 final class MultiplexInputStream extends InputStream { |
|
38 |
|
39 /** object managing multiplexed connection */ |
|
40 private ConnectionMultiplexer manager; |
|
41 |
|
42 /** information about the connection this is the input stream for */ |
|
43 private MultiplexConnectionInfo info; |
|
44 |
|
45 /** input buffer */ |
|
46 private byte buffer[]; |
|
47 |
|
48 /** number of real data bytes present in buffer */ |
|
49 private int present = 0; |
|
50 |
|
51 /** current position to read from in input buffer */ |
|
52 private int pos = 0; |
|
53 |
|
54 /** pending number of bytes this stream has requested */ |
|
55 private int requested = 0; |
|
56 |
|
57 /** true if this connection has been disconnected */ |
|
58 private boolean disconnected = false; |
|
59 |
|
60 /** |
|
61 * lock acquired to access shared variables: |
|
62 * buffer, present, pos, requested, & disconnected |
|
63 * WARNING: Any of the methods manager.send*() should not be |
|
64 * invoked while this lock is held, since they could potentially |
|
65 * block if the underlying connection's transport buffers are |
|
66 * full, and the manager may need to acquire this lock to process |
|
67 * and consume data coming over the underlying connection. |
|
68 */ |
|
69 private Object lock = new Object(); |
|
70 |
|
71 /** level at which more data is requested when read past */ |
|
72 private int waterMark; |
|
73 |
|
74 /** data structure for holding reads of one byte */ |
|
75 private byte temp[] = new byte[1]; |
|
76 |
|
77 /** |
|
78 * Create a new MultiplexInputStream for the given manager. |
|
79 * @param manager object that manages this connection |
|
80 * @param info structure for connection this stream reads from |
|
81 * @param bufferLength length of input buffer |
|
82 */ |
|
83 MultiplexInputStream( |
|
84 ConnectionMultiplexer manager, |
|
85 MultiplexConnectionInfo info, |
|
86 int bufferLength) |
|
87 { |
|
88 this.manager = manager; |
|
89 this.info = info; |
|
90 |
|
91 buffer = new byte[bufferLength]; |
|
92 waterMark = bufferLength / 2; |
|
93 } |
|
94 |
|
95 /** |
|
96 * Read a byte from the connection. |
|
97 */ |
|
98 public synchronized int read() throws IOException |
|
99 { |
|
100 int n = read(temp, 0, 1); |
|
101 if (n != 1) |
|
102 return -1; |
|
103 return temp[0] & 0xFF; |
|
104 } |
|
105 |
|
106 /** |
|
107 * Read a subarray of bytes from connection. This method blocks for |
|
108 * at least one byte, and it returns the number of bytes actually read, |
|
109 * or -1 if the end of the stream was detected. |
|
110 * @param b array to read bytes into |
|
111 * @param off offset of beginning of bytes to read into |
|
112 * @param len number of bytes to read |
|
113 */ |
|
114 public synchronized int read(byte b[], int off, int len) throws IOException |
|
115 { |
|
116 if (len <= 0) |
|
117 return 0; |
|
118 |
|
119 int moreSpace; |
|
120 synchronized (lock) { |
|
121 if (pos >= present) |
|
122 pos = present = 0; |
|
123 else if (pos >= waterMark) { |
|
124 System.arraycopy(buffer, pos, buffer, 0, present - pos); |
|
125 present -= pos; |
|
126 pos = 0; |
|
127 } |
|
128 int freeSpace = buffer.length - present; |
|
129 moreSpace = Math.max(freeSpace - requested, 0); |
|
130 } |
|
131 if (moreSpace > 0) |
|
132 manager.sendRequest(info, moreSpace); |
|
133 synchronized (lock) { |
|
134 requested += moreSpace; |
|
135 while ((pos >= present) && !disconnected) { |
|
136 try { |
|
137 lock.wait(); |
|
138 } catch (InterruptedException e) { |
|
139 } |
|
140 } |
|
141 if (disconnected && pos >= present) |
|
142 return -1; |
|
143 |
|
144 int available = present - pos; |
|
145 if (len < available) { |
|
146 System.arraycopy(buffer, pos, b, off, len); |
|
147 pos += len; |
|
148 return len; |
|
149 } |
|
150 else { |
|
151 System.arraycopy(buffer, pos, b, off, available); |
|
152 pos = present = 0; |
|
153 // could send another request here, if len > available?? |
|
154 return available; |
|
155 } |
|
156 } |
|
157 } |
|
158 |
|
159 /** |
|
160 * Return the number of bytes immediately available for reading. |
|
161 */ |
|
162 public int available() throws IOException |
|
163 { |
|
164 synchronized (lock) { |
|
165 return present - pos; |
|
166 } |
|
167 } |
|
168 |
|
169 /** |
|
170 * Close this connection. |
|
171 */ |
|
172 public void close() throws IOException |
|
173 { |
|
174 manager.sendClose(info); |
|
175 } |
|
176 |
|
177 /** |
|
178 * Receive bytes transmitted from connection at remote endpoint. |
|
179 * @param length number of bytes transmitted |
|
180 * @param in input stream with those bytes ready to be read |
|
181 */ |
|
182 void receive(int length, DataInputStream in) |
|
183 throws IOException |
|
184 { |
|
185 /* TO DO: Optimize so that data received from stream can be loaded |
|
186 * directly into user's buffer if there is a pending read(). |
|
187 */ |
|
188 synchronized (lock) { |
|
189 if ((pos > 0) && ((buffer.length - present) < length)) { |
|
190 System.arraycopy(buffer, pos, buffer, 0, present - pos); |
|
191 present -= pos; |
|
192 pos = 0; |
|
193 } |
|
194 if ((buffer.length - present) < length) |
|
195 throw new IOException("Receive buffer overflow"); |
|
196 in.readFully(buffer, present, length); |
|
197 present += length; |
|
198 requested -= length; |
|
199 lock.notifyAll(); |
|
200 } |
|
201 } |
|
202 |
|
203 /** |
|
204 * Disconnect this stream from all connection activity. |
|
205 */ |
|
206 void disconnect() |
|
207 { |
|
208 synchronized (lock) { |
|
209 disconnected = true; |
|
210 lock.notifyAll(); |
|
211 } |
|
212 } |
|
213 } |
|