1 /* |
|
2 * Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved. |
|
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
|
4 * |
|
5 * This code is free software; you can redistribute it and/or modify it |
|
6 * under the terms of the GNU General Public License version 2 only, as |
|
7 * published by the Free Software Foundation. Oracle designates this |
|
8 * particular file as subject to the "Classpath" exception as provided |
|
9 * by Oracle in the LICENSE file that accompanied this code. |
|
10 * |
|
11 * This code is distributed in the hope that it will be useful, but WITHOUT |
|
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
|
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
|
14 * version 2 for more details (a copy is included in the LICENSE file that |
|
15 * accompanied this code). |
|
16 * |
|
17 * You should have received a copy of the GNU General Public License version |
|
18 * 2 along with this work; if not, write to the Free Software Foundation, |
|
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
|
20 * |
|
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
|
22 * or visit www.oracle.com if you need additional information or have any |
|
23 * questions. |
|
24 */ |
|
25 |
|
26 package jdk.incubator.http.internal; |
|
27 |
|
28 import java.lang.System.Logger.Level; |
|
29 import java.util.ArrayList; |
|
30 import java.util.Map; |
|
31 import java.util.HashMap; |
|
32 import java.util.Iterator; |
|
33 import java.util.LinkedHashMap; |
|
34 import java.util.List; |
|
35 import java.util.concurrent.locks.ReentrantLock; |
|
36 import jdk.incubator.http.internal.common.Utils; |
|
37 |
|
38 /** |
|
39 * A Send Window Flow-Controller that is used to control outgoing Connection |
|
40 * and Stream flows, per HTTP/2 connection. |
|
41 * |
|
42 * A Http2Connection has its own unique single instance of a WindowController |
|
43 * that it shares with its Streams. Each stream must acquire the appropriate |
|
44 * amount of Send Window from the controller before sending data. |
|
45 * |
|
46 * WINDOW_UPDATE frames, both connection and stream specific, must notify the |
|
47 * controller of their increments. SETTINGS frame's INITIAL_WINDOW_SIZE must |
|
48 * notify the controller so that it can adjust the active stream's window size. |
|
49 */ |
|
50 final class WindowController { |
|
51 |
|
52 static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary developer's flag |
|
53 static final System.Logger DEBUG_LOGGER = |
|
54 Utils.getDebugLogger("WindowController"::toString, DEBUG); |
|
55 |
|
56 /** |
|
57 * Default initial connection Flow-Control Send Window size, as per HTTP/2. |
|
58 */ |
|
59 private static final int DEFAULT_INITIAL_WINDOW_SIZE = 64 * 1024 - 1; |
|
60 |
|
61 /** The connection Send Window size. */ |
|
62 private int connectionWindowSize; |
|
63 /** A Map of the active streams, where the key is the stream id, and the |
|
64 * value is the stream's Send Window size, which may be negative. */ |
|
65 private final Map<Integer,Integer> streams = new HashMap<>(); |
|
66 /** A Map of streams awaiting Send Window. The key is the stream id. The |
|
67 * value is a pair of the Stream ( representing the key's stream id ) and |
|
68 * the requested amount of send Window. */ |
|
69 private final Map<Integer, Map.Entry<Stream<?>, Integer>> pending |
|
70 = new LinkedHashMap<>(); |
|
71 |
|
72 private final ReentrantLock controllerLock = new ReentrantLock(); |
|
73 |
|
74 /** A Controller with the default initial window size. */ |
|
75 WindowController() { |
|
76 connectionWindowSize = DEFAULT_INITIAL_WINDOW_SIZE; |
|
77 } |
|
78 |
|
79 // /** A Controller with the given initial window size. */ |
|
80 // WindowController(int initialConnectionWindowSize) { |
|
81 // connectionWindowSize = initialConnectionWindowSize; |
|
82 // } |
|
83 |
|
84 /** Registers the given stream with this controller. */ |
|
85 void registerStream(int streamid, int initialStreamWindowSize) { |
|
86 controllerLock.lock(); |
|
87 try { |
|
88 Integer old = streams.put(streamid, initialStreamWindowSize); |
|
89 if (old != null) |
|
90 throw new InternalError("Unexpected entry [" |
|
91 + old + "] for streamid: " + streamid); |
|
92 } finally { |
|
93 controllerLock.unlock(); |
|
94 } |
|
95 } |
|
96 |
|
97 /** Removes/De-registers the given stream with this controller. */ |
|
98 void removeStream(int streamid) { |
|
99 controllerLock.lock(); |
|
100 try { |
|
101 Integer old = streams.remove(streamid); |
|
102 // Odd stream numbers (client streams) should have been registered. |
|
103 // Even stream numbers (server streams - aka Push Streams) should |
|
104 // not be registered |
|
105 final boolean isClientStream = (streamid % 2) == 1; |
|
106 if (old == null && isClientStream) { |
|
107 throw new InternalError("Expected entry for streamid: " + streamid); |
|
108 } else if (old != null && !isClientStream) { |
|
109 throw new InternalError("Unexpected entry for streamid: " + streamid); |
|
110 } |
|
111 } finally { |
|
112 controllerLock.unlock(); |
|
113 } |
|
114 } |
|
115 |
|
116 /** |
|
117 * Attempts to acquire the requested amount of Send Window for the given |
|
118 * stream. |
|
119 * |
|
120 * The actual amount of Send Window available may differ from the requested |
|
121 * amount. The actual amount, returned by this method, is the minimum of, |
|
122 * 1) the requested amount, 2) the stream's Send Window, and 3) the |
|
123 * connection's Send Window. |
|
124 * |
|
125 * A negative or zero value is returned if there's no window available. |
|
126 * When the result is negative or zero, this method arranges for the |
|
127 * given stream's {@link Stream#signalWindowUpdate()} method to be invoke at |
|
128 * a later time when the connection and/or stream window's have been |
|
129 * increased. The {@code tryAcquire} method should then be invoked again to |
|
130 * attempt to acquire the available window. |
|
131 */ |
|
132 int tryAcquire(int requestAmount, int streamid, Stream<?> stream) { |
|
133 controllerLock.lock(); |
|
134 try { |
|
135 Integer streamSize = streams.get(streamid); |
|
136 if (streamSize == null) |
|
137 throw new InternalError("Expected entry for streamid: " |
|
138 + streamid); |
|
139 int x = Math.min(requestAmount, |
|
140 Math.min(streamSize, connectionWindowSize)); |
|
141 |
|
142 if (x <= 0) { // stream window size may be negative |
|
143 DEBUG_LOGGER.log(Level.DEBUG, |
|
144 "Stream %d requesting %d but only %d available (stream: %d, connection: %d)", |
|
145 streamid, requestAmount, Math.min(streamSize, connectionWindowSize), |
|
146 streamSize, connectionWindowSize); |
|
147 // If there's not enough window size available, put the |
|
148 // caller in a pending list. |
|
149 pending.put(streamid, Map.entry(stream, requestAmount)); |
|
150 return x; |
|
151 } |
|
152 |
|
153 // Remove the caller from the pending list ( if was waiting ). |
|
154 pending.remove(streamid); |
|
155 |
|
156 // Update window sizes and return the allocated amount to the caller. |
|
157 streamSize -= x; |
|
158 streams.put(streamid, streamSize); |
|
159 connectionWindowSize -= x; |
|
160 DEBUG_LOGGER.log(Level.DEBUG, |
|
161 "Stream %d amount allocated %d, now %d available (stream: %d, connection: %d)", |
|
162 streamid, x, Math.min(streamSize, connectionWindowSize), |
|
163 streamSize, connectionWindowSize); |
|
164 return x; |
|
165 } finally { |
|
166 controllerLock.unlock(); |
|
167 } |
|
168 } |
|
169 |
|
170 /** |
|
171 * Increases the Send Window size for the connection. |
|
172 * |
|
173 * A number of awaiting requesters, from unfulfilled tryAcquire requests, |
|
174 * may have their stream's {@link Stream#signalWindowUpdate()} method |
|
175 * scheduled to run ( i.e. awake awaiters ). |
|
176 * |
|
177 * @return false if, and only if, the addition of the given amount would |
|
178 * cause the Send Window to exceed 2^31-1 (overflow), otherwise true |
|
179 */ |
|
180 boolean increaseConnectionWindow(int amount) { |
|
181 List<Stream<?>> candidates = null; |
|
182 controllerLock.lock(); |
|
183 try { |
|
184 int size = connectionWindowSize; |
|
185 size += amount; |
|
186 if (size < 0) |
|
187 return false; |
|
188 connectionWindowSize = size; |
|
189 DEBUG_LOGGER.log(Level.DEBUG, "Connection window size is now %d", size); |
|
190 |
|
191 // Notify waiting streams, until the new increased window size is |
|
192 // effectively exhausted. |
|
193 Iterator<Map.Entry<Integer,Map.Entry<Stream<?>,Integer>>> iter = |
|
194 pending.entrySet().iterator(); |
|
195 |
|
196 while (iter.hasNext() && size > 0) { |
|
197 Map.Entry<Integer,Map.Entry<Stream<?>,Integer>> item = iter.next(); |
|
198 Integer streamSize = streams.get(item.getKey()); |
|
199 if (streamSize == null) { |
|
200 iter.remove(); |
|
201 } else { |
|
202 Map.Entry<Stream<?>,Integer> e = item.getValue(); |
|
203 int requestedAmount = e.getValue(); |
|
204 // only wakes up the pending streams for which there is |
|
205 // at least 1 byte of space in both windows |
|
206 int minAmount = 1; |
|
207 if (size >= minAmount && streamSize >= minAmount) { |
|
208 size -= Math.min(streamSize, requestedAmount); |
|
209 iter.remove(); |
|
210 if (candidates == null) |
|
211 candidates = new ArrayList<>(); |
|
212 candidates.add(e.getKey()); |
|
213 } |
|
214 } |
|
215 } |
|
216 } finally { |
|
217 controllerLock.unlock(); |
|
218 } |
|
219 if (candidates != null) { |
|
220 candidates.forEach(Stream::signalWindowUpdate); |
|
221 } |
|
222 return true; |
|
223 } |
|
224 |
|
225 /** |
|
226 * Increases the Send Window size for the given stream. |
|
227 * |
|
228 * If the given stream is awaiting window size, from an unfulfilled |
|
229 * tryAcquire request, it will have its stream's {@link |
|
230 * Stream#signalWindowUpdate()} method scheduled to run ( i.e. awoken ). |
|
231 * |
|
232 * @return false if, and only if, the addition of the given amount would |
|
233 * cause the Send Window to exceed 2^31-1 (overflow), otherwise true |
|
234 */ |
|
235 boolean increaseStreamWindow(int amount, int streamid) { |
|
236 Stream<?> s = null; |
|
237 controllerLock.lock(); |
|
238 try { |
|
239 Integer size = streams.get(streamid); |
|
240 if (size == null) |
|
241 throw new InternalError("Expected entry for streamid: " + streamid); |
|
242 size += amount; |
|
243 if (size < 0) |
|
244 return false; |
|
245 streams.put(streamid, size); |
|
246 DEBUG_LOGGER.log(Level.DEBUG, |
|
247 "Stream %s window size is now %s", streamid, size); |
|
248 |
|
249 Map.Entry<Stream<?>,Integer> p = pending.get(streamid); |
|
250 if (p != null) { |
|
251 int minAmount = 1; |
|
252 // only wakes up the pending stream if there is at least |
|
253 // 1 byte of space in both windows |
|
254 if (size >= minAmount |
|
255 && connectionWindowSize >= minAmount) { |
|
256 pending.remove(streamid); |
|
257 s = p.getKey(); |
|
258 } |
|
259 } |
|
260 } finally { |
|
261 controllerLock.unlock(); |
|
262 } |
|
263 |
|
264 if (s != null) |
|
265 s.signalWindowUpdate(); |
|
266 |
|
267 return true; |
|
268 } |
|
269 |
|
270 /** |
|
271 * Adjusts, either increases or decreases, the active streams registered |
|
272 * with this controller. May result in a stream's Send Window size becoming |
|
273 * negative. |
|
274 */ |
|
275 void adjustActiveStreams(int adjustAmount) { |
|
276 assert adjustAmount != 0; |
|
277 |
|
278 controllerLock.lock(); |
|
279 try { |
|
280 for (Map.Entry<Integer,Integer> entry : streams.entrySet()) { |
|
281 int streamid = entry.getKey(); |
|
282 // the API only supports sending on Streams initialed by |
|
283 // the client, i.e. odd stream numbers |
|
284 if (streamid != 0 && (streamid % 2) != 0) { |
|
285 Integer size = entry.getValue(); |
|
286 size += adjustAmount; |
|
287 streams.put(streamid, size); |
|
288 DEBUG_LOGGER.log(Level.DEBUG, |
|
289 "Stream %s window size is now %s", streamid, size); |
|
290 } |
|
291 } |
|
292 } finally { |
|
293 controllerLock.unlock(); |
|
294 } |
|
295 } |
|
296 |
|
297 /** Returns the Send Window size for the connection. */ |
|
298 int connectionWindowSize() { |
|
299 controllerLock.lock(); |
|
300 try { |
|
301 return connectionWindowSize; |
|
302 } finally { |
|
303 controllerLock.unlock(); |
|
304 } |
|
305 } |
|
306 |
|
307 // /** Returns the Send Window size for the given stream. */ |
|
308 // int streamWindowSize(int streamid) { |
|
309 // controllerLock.lock(); |
|
310 // try { |
|
311 // Integer size = streams.get(streamid); |
|
312 // if (size == null) |
|
313 // throw new InternalError("Expected entry for streamid: " + streamid); |
|
314 // return size; |
|
315 // } finally { |
|
316 // controllerLock.unlock(); |
|
317 // } |
|
318 // } |
|
319 |
|
320 } |
|