44 * It's a stateful class due to the fact that FramesDecoder stores buffers inside. |
45 * It's a stateful class due to the fact that FramesDecoder stores buffers inside. |
45 * Should be allocated only the single instance per connection. |
46 * Should be allocated only the single instance per connection. |
46 */ |
47 */ |
47 public class FramesDecoder { |
48 public class FramesDecoder { |
48 |
49 |
49 |
50 static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. |
|
51 static final System.Logger DEBUG_LOGGER = |
|
52 Utils.getDebugLogger("FramesDecoder"::toString, DEBUG); |
50 |
53 |
51 @FunctionalInterface |
54 @FunctionalInterface |
52 public interface FrameProcessor { |
55 public interface FrameProcessor { |
53 void processFrame(Http2Frame frame) throws IOException; |
56 void processFrame(Http2Frame frame) throws IOException; |
54 } |
57 } |
55 |
58 |
56 private final FrameProcessor frameProcessor; |
59 private final FrameProcessor frameProcessor; |
57 private final int maxFrameSize; |
60 private final int maxFrameSize; |
58 |
61 |
59 private ByteBufferReference currentBuffer; // current buffer either null or hasRemaining |
62 private ByteBuffer currentBuffer; // current buffer either null or hasRemaining |
60 |
63 |
61 private final java.util.Queue<ByteBufferReference> tailBuffers = new ArrayDeque<>(); |
64 private final ArrayDeque<ByteBuffer> tailBuffers = new ArrayDeque<>(); |
62 private int tailSize = 0; |
65 private int tailSize = 0; |
63 |
66 |
64 private boolean slicedToDataFrame = false; |
67 private boolean slicedToDataFrame = false; |
65 |
68 |
66 private final List<ByteBufferReference> prepareToRelease = new ArrayList<>(); |
69 private final List<ByteBuffer> prepareToRelease = new ArrayList<>(); |
67 |
70 |
68 // if true - Frame Header was parsed (9 bytes consumed) and subsequent fields have meaning |
71 // if true - Frame Header was parsed (9 bytes consumed) and subsequent fields have meaning |
69 // otherwise - stopped at frames boundary |
72 // otherwise - stopped at frames boundary |
70 private boolean frameHeaderParsed = false; |
73 private boolean frameHeaderParsed = false; |
71 private int frameLength; |
74 private int frameLength; |
72 private int frameType; |
75 private int frameType; |
73 private int frameFlags; |
76 private int frameFlags; |
74 private int frameStreamid; |
77 private int frameStreamid; |
|
78 private boolean closed; |
75 |
79 |
76 /** |
80 /** |
77 * Creates Frame Decoder |
81 * Creates Frame Decoder |
78 * |
82 * |
79 * @param frameProcessor - callback for decoded frames |
83 * @param frameProcessor - callback for decoded frames |
90 public FramesDecoder(FrameProcessor frameProcessor, int maxFrameSize) { |
94 public FramesDecoder(FrameProcessor frameProcessor, int maxFrameSize) { |
91 this.frameProcessor = frameProcessor; |
95 this.frameProcessor = frameProcessor; |
92 this.maxFrameSize = Math.min(Math.max(16 * 1024, maxFrameSize), 16 * 1024 * 1024 - 1); |
96 this.maxFrameSize = Math.min(Math.max(16 * 1024, maxFrameSize), 16 * 1024 * 1024 - 1); |
93 } |
97 } |
94 |
98 |
|
99 /** Threshold beyond which data is no longer copied into the current buffer, |
|
100 * if that buffer has enough unused space. */ |
|
101 private static final int COPY_THRESHOLD = 8192; |
|
102 |
95 /** |
103 /** |
96 * put next buffer into queue, |
104 * Adds the data from the given buffer, and performs frame decoding if |
97 * if frame decoding is possible - decode all buffers and invoke FrameProcessor |
105 * possible. Either 1) appends the data from the given buffer to the |
|
106 * current buffer ( if there is enough unused space ), or 2) adds it to the |
|
107 * next buffer in the queue. |
98 * |
108 * |
99 * @param buffer |
109 * If there is enough data to perform frame decoding then, all buffers are |
100 * @throws IOException |
110 * decoded and the FrameProcessor is invoked. |
101 */ |
111 */ |
102 public void decode(ByteBufferReference buffer) throws IOException { |
112 public void decode(ByteBuffer inBoundBuffer) throws IOException { |
103 int remaining = buffer.get().remaining(); |
113 if (closed) { |
|
114 DEBUG_LOGGER.log(Level.DEBUG, "closed: ignoring buffer (%s bytes)", |
|
115 inBoundBuffer.remaining()); |
|
116 inBoundBuffer.position(inBoundBuffer.limit()); |
|
117 return; |
|
118 } |
|
119 int remaining = inBoundBuffer.remaining(); |
|
120 DEBUG_LOGGER.log(Level.DEBUG, "decodes: %d", remaining); |
104 if (remaining > 0) { |
121 if (remaining > 0) { |
105 if (currentBuffer == null) { |
122 if (currentBuffer == null) { |
106 currentBuffer = buffer; |
123 currentBuffer = inBoundBuffer; |
107 } else { |
124 } else { |
108 tailBuffers.add(buffer); |
125 ByteBuffer b = currentBuffer; |
109 tailSize += remaining; |
126 if (!tailBuffers.isEmpty()) { |
110 } |
127 b = tailBuffers.getLast(); |
111 } |
128 } |
|
129 |
|
130 int limit = b.limit(); |
|
131 int freeSpace = b.capacity() - limit; |
|
132 if (remaining <= COPY_THRESHOLD && freeSpace >= remaining) { |
|
133 // append the new data to the unused space in the current buffer |
|
134 int position = b.position(); |
|
135 b.position(limit); |
|
136 b.limit(limit + inBoundBuffer.remaining()); |
|
137 b.put(inBoundBuffer); |
|
138 b.position(position); |
|
139 if (b != currentBuffer) |
|
140 tailSize += remaining; |
|
141 DEBUG_LOGGER.log(Level.DEBUG, "copied: %d", remaining); |
|
142 } else { |
|
143 DEBUG_LOGGER.log(Level.DEBUG, "added: %d", remaining); |
|
144 tailBuffers.add(inBoundBuffer); |
|
145 tailSize += remaining; |
|
146 } |
|
147 } |
|
148 } |
|
149 DEBUG_LOGGER.log(Level.DEBUG, "Tail size is now: %d, current=", |
|
150 tailSize, |
|
151 (currentBuffer == null ? 0 : |
|
152 currentBuffer.remaining())); |
112 Http2Frame frame; |
153 Http2Frame frame; |
113 while ((frame = nextFrame()) != null) { |
154 while ((frame = nextFrame()) != null) { |
|
155 DEBUG_LOGGER.log(Level.DEBUG, "Got frame: %s", frame); |
114 frameProcessor.processFrame(frame); |
156 frameProcessor.processFrame(frame); |
115 frameProcessed(); |
157 frameProcessed(); |
116 } |
158 } |
117 } |
159 } |
118 |
160 |
119 private Http2Frame nextFrame() throws IOException { |
161 private Http2Frame nextFrame() throws IOException { |
120 while (true) { |
162 while (true) { |
121 if (currentBuffer == null) { |
163 if (currentBuffer == null) { |
122 return null; // no data at all |
164 return null; // no data at all |
123 } |
165 } |
|
166 long available = currentBuffer.remaining() + tailSize; |
124 if (!frameHeaderParsed) { |
167 if (!frameHeaderParsed) { |
125 if (currentBuffer.get().remaining() + tailSize >= Http2Frame.FRAME_HEADER_SIZE) { |
168 if (available >= Http2Frame.FRAME_HEADER_SIZE) { |
126 parseFrameHeader(); |
169 parseFrameHeader(); |
127 if (frameLength > maxFrameSize) { |
170 if (frameLength > maxFrameSize) { |
128 // connection error |
171 // connection error |
129 return new MalformedFrame(ErrorFrame.FRAME_SIZE_ERROR, |
172 return new MalformedFrame(ErrorFrame.FRAME_SIZE_ERROR, |
130 "Frame type("+frameType+") " +"length("+frameLength+") exceeds MAX_FRAME_SIZE("+ maxFrameSize+")"); |
173 "Frame type("+frameType+") " |
|
174 +"length("+frameLength |
|
175 +") exceeds MAX_FRAME_SIZE(" |
|
176 + maxFrameSize+")"); |
131 } |
177 } |
132 frameHeaderParsed = true; |
178 frameHeaderParsed = true; |
133 } else { |
179 } else { |
134 return null; // no data for frame header |
180 DEBUG_LOGGER.log(Level.DEBUG, |
|
181 "Not enough data to parse header, needs: %d, has: %d", |
|
182 Http2Frame.FRAME_HEADER_SIZE, available); |
|
183 return null; |
135 } |
184 } |
136 } |
185 } |
|
186 available = currentBuffer == null ? 0 : currentBuffer.remaining() + tailSize; |
137 if ((frameLength == 0) || |
187 if ((frameLength == 0) || |
138 (currentBuffer != null && currentBuffer.get().remaining() + tailSize >= frameLength)) { |
188 (currentBuffer != null && available >= frameLength)) { |
139 Http2Frame frame = parseFrameBody(); |
189 Http2Frame frame = parseFrameBody(); |
140 frameHeaderParsed = false; |
190 frameHeaderParsed = false; |
141 // frame == null means we have to skip this frame and try parse next |
191 // frame == null means we have to skip this frame and try parse next |
142 if (frame != null) { |
192 if (frame != null) { |
143 return frame; |
193 return frame; |
144 } |
194 } |
145 } else { |
195 } else { |
|
196 DEBUG_LOGGER.log(Level.DEBUG, |
|
197 "Not enough data to parse frame body, needs: %d, has: %d", |
|
198 frameLength, available); |
146 return null; // no data for the whole frame header |
199 return null; // no data for the whole frame header |
147 } |
200 } |
148 } |
201 } |
149 } |
202 } |
150 |
203 |
151 private void frameProcessed() { |
204 private void frameProcessed() { |
152 prepareToRelease.forEach(ByteBufferReference::clear); |
|
153 prepareToRelease.clear(); |
205 prepareToRelease.clear(); |
154 } |
206 } |
155 |
207 |
156 private void parseFrameHeader() throws IOException { |
208 private void parseFrameHeader() throws IOException { |
157 int x = getInt(); |
209 int x = getInt(); |
158 this.frameLength = x >> 8; |
210 this.frameLength = (x >>> 8) & 0x00ffffff; |
159 this.frameType = x & 0xff; |
211 this.frameType = x & 0xff; |
160 this.frameFlags = getByte(); |
212 this.frameFlags = getByte(); |
161 this.frameStreamid = getInt() & 0x7fffffff; |
213 this.frameStreamid = getInt() & 0x7fffffff; |
162 // R: A reserved 1-bit field. The semantics of this bit are undefined, |
214 // R: A reserved 1-bit field. The semantics of this bit are undefined, |
163 // MUST be ignored when receiving. |
215 // MUST be ignored when receiving. |
164 } |
216 } |
165 |
217 |
166 // move next buffer from tailBuffers to currentBuffer if required |
218 // move next buffer from tailBuffers to currentBuffer if required |
167 private void nextBuffer() { |
219 private void nextBuffer() { |
168 if (!currentBuffer.get().hasRemaining()) { |
220 if (!currentBuffer.hasRemaining()) { |
169 if (!slicedToDataFrame) { |
221 if (!slicedToDataFrame) { |
170 prepareToRelease.add(currentBuffer); |
222 prepareToRelease.add(currentBuffer); |
171 } |
223 } |
172 slicedToDataFrame = false; |
224 slicedToDataFrame = false; |
173 currentBuffer = tailBuffers.poll(); |
225 currentBuffer = tailBuffers.poll(); |
174 if (currentBuffer != null) { |
226 if (currentBuffer != null) { |
175 tailSize -= currentBuffer.get().remaining(); |
227 tailSize -= currentBuffer.remaining(); |
176 } |
228 } |
177 } |
229 } |
178 } |
230 } |
179 |
231 |
180 public int getByte() { |
232 public int getByte() { |
181 ByteBuffer buf = currentBuffer.get(); |
233 int res = currentBuffer.get() & 0xff; |
182 int res = buf.get() & 0xff; |
|
183 nextBuffer(); |
234 nextBuffer(); |
184 return res; |
235 return res; |
185 } |
236 } |
186 |
237 |
187 public int getShort() { |
238 public int getShort() { |
188 ByteBuffer buf = currentBuffer.get(); |
239 if (currentBuffer.remaining() >= 2) { |
189 if (buf.remaining() >= 2) { |
240 int res = currentBuffer.getShort() & 0xffff; |
190 int res = buf.getShort() & 0xffff; |
|
191 nextBuffer(); |
241 nextBuffer(); |
192 return res; |
242 return res; |
193 } |
243 } |
194 int val = getByte(); |
244 int val = getByte(); |
195 val = (val << 8) + getByte(); |
245 val = (val << 8) + getByte(); |
196 return val; |
246 return val; |
197 } |
247 } |
198 |
248 |
199 public int getInt() { |
249 public int getInt() { |
200 ByteBuffer buf = currentBuffer.get(); |
250 if (currentBuffer.remaining() >= 4) { |
201 if (buf.remaining() >= 4) { |
251 int res = currentBuffer.getInt(); |
202 int res = buf.getInt(); |
|
203 nextBuffer(); |
252 nextBuffer(); |
204 return res; |
253 return res; |
205 } |
254 } |
206 int val = getByte(); |
255 int val = getByte(); |
207 val = (val << 8) + getByte(); |
256 val = (val << 8) + getByte(); |
213 |
262 |
214 public byte[] getBytes(int n) { |
263 public byte[] getBytes(int n) { |
215 byte[] bytes = new byte[n]; |
264 byte[] bytes = new byte[n]; |
216 int offset = 0; |
265 int offset = 0; |
217 while (n > 0) { |
266 while (n > 0) { |
218 ByteBuffer buf = currentBuffer.get(); |
267 int length = Math.min(n, currentBuffer.remaining()); |
219 int length = Math.min(n, buf.remaining()); |
268 currentBuffer.get(bytes, offset, length); |
220 buf.get(bytes, offset, length); |
|
221 offset += length; |
269 offset += length; |
222 n -= length; |
270 n -= length; |
223 nextBuffer(); |
271 nextBuffer(); |
224 } |
272 } |
225 return bytes; |
273 return bytes; |
226 |
274 |
227 } |
275 } |
228 |
276 |
229 private ByteBufferReference[] getBuffers(boolean isDataFrame, int bytecount) { |
277 private List<ByteBuffer> getBuffers(boolean isDataFrame, int bytecount) { |
230 List<ByteBufferReference> res = new ArrayList<>(); |
278 List<ByteBuffer> res = new ArrayList<>(); |
231 while (bytecount > 0) { |
279 while (bytecount > 0) { |
232 ByteBuffer buf = currentBuffer.get(); |
280 int remaining = currentBuffer.remaining(); |
233 int remaining = buf.remaining(); |
|
234 int extract = Math.min(remaining, bytecount); |
281 int extract = Math.min(remaining, bytecount); |
235 ByteBuffer extractedBuf; |
282 ByteBuffer extractedBuf; |
236 if (isDataFrame) { |
283 if (isDataFrame) { |
237 extractedBuf = Utils.slice(buf, extract); |
284 extractedBuf = Utils.slice(currentBuffer, extract); |
238 slicedToDataFrame = true; |
285 slicedToDataFrame = true; |
239 } else { |
286 } else { |
240 // Header frames here |
287 // Header frames here |
241 // HPACK decoding should performed under lock and immediately after frame decoding. |
288 // HPACK decoding should performed under lock and immediately after frame decoding. |
242 // in that case it is safe to release original buffer, |
289 // in that case it is safe to release original buffer, |
243 // because of sliced buffer has a very short life |
290 // because of sliced buffer has a very short life |
244 extractedBuf = Utils.slice(buf, extract); |
291 extractedBuf = Utils.slice(currentBuffer, extract); |
245 } |
292 } |
246 res.add(ByteBufferReference.of(extractedBuf)); |
293 res.add(extractedBuf); |
247 bytecount -= extract; |
294 bytecount -= extract; |
248 nextBuffer(); |
295 nextBuffer(); |
249 } |
296 } |
250 return res.toArray(new ByteBufferReference[0]); |
297 return res; |
|
298 } |
|
299 |
|
300 public void close(String msg) { |
|
301 closed = true; |
|
302 tailBuffers.clear(); |
|
303 int bytes = tailSize; |
|
304 ByteBuffer b = currentBuffer; |
|
305 if (b != null) { |
|
306 bytes += b.remaining(); |
|
307 b.position(b.limit()); |
|
308 } |
|
309 tailSize = 0; |
|
310 currentBuffer = null; |
|
311 DEBUG_LOGGER.log(Level.DEBUG, "closed %s, ignoring %d bytes", msg, bytes); |
251 } |
312 } |
252 |
313 |
253 public void skipBytes(int bytecount) { |
314 public void skipBytes(int bytecount) { |
254 while (bytecount > 0) { |
315 while (bytecount > 0) { |
255 ByteBuffer buf = currentBuffer.get(); |
316 int remaining = currentBuffer.remaining(); |
256 int remaining = buf.remaining(); |
|
257 int extract = Math.min(remaining, bytecount); |
317 int extract = Math.min(remaining, bytecount); |
258 buf.position(buf.position() + extract); |
318 currentBuffer.position(currentBuffer.position() + extract); |
259 bytecount -= remaining; |
319 bytecount -= remaining; |
260 nextBuffer(); |
320 nextBuffer(); |
261 } |
321 } |
262 } |
322 } |
263 |
323 |