src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/frame/FramesDecoder.java
changeset 48083 b1c1b4ef4be2
parent 47216 71c04702a3d5
child 48703 3eae36c6baa5
child 55973 4d9b002587db
equal deleted inserted replaced
48081:89829dd3cc54 48083:b1c1b4ef4be2
     1 /*
     1 /*
     2  * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
     2  * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     4  *
     4  *
     5  * This code is free software; you can redistribute it and/or modify it
     5  * This code is free software; you can redistribute it and/or modify it
     6  * under the terms of the GNU General Public License version 2 only, as
     6  * under the terms of the GNU General Public License version 2 only, as
     7  * published by the Free Software Foundation.  Oracle designates this
     7  * published by the Free Software Foundation.  Oracle designates this
    23  * questions.
    23  * questions.
    24  */
    24  */
    25 
    25 
    26 package jdk.incubator.http.internal.frame;
    26 package jdk.incubator.http.internal.frame;
    27 
    27 
    28 import jdk.incubator.http.internal.common.ByteBufferReference;
       
    29 import jdk.incubator.http.internal.common.Log;
    28 import jdk.incubator.http.internal.common.Log;
    30 import jdk.incubator.http.internal.common.Utils;
    29 import jdk.incubator.http.internal.common.Utils;
    31 
    30 
    32 import java.io.IOException;
    31 import java.io.IOException;
       
    32 import java.lang.System.Logger.Level;
    33 import java.nio.ByteBuffer;
    33 import java.nio.ByteBuffer;
    34 import java.util.ArrayDeque;
    34 import java.util.ArrayDeque;
    35 import java.util.ArrayList;
    35 import java.util.ArrayList;
    36 import java.util.List;
    36 import java.util.List;
       
    37 import java.util.Queue;
    37 
    38 
    38 /**
    39 /**
    39  * Frames Decoder
    40  * Frames Decoder
    40  * <p>
    41  * <p>
    41  * collect buffers until frame decoding is possible,
    42  * collect buffers until frame decoding is possible,
    44  * It's a stateful class due to the fact that FramesDecoder stores buffers inside.
    45  * It's a stateful class due to the fact that FramesDecoder stores buffers inside.
    45  * Should be allocated only the single instance per connection.
    46  * Should be allocated only the single instance per connection.
    46  */
    47  */
    47 public class FramesDecoder {
    48 public class FramesDecoder {
    48 
    49 
    49 
    50     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
       
    51     static final System.Logger DEBUG_LOGGER =
       
    52             Utils.getDebugLogger("FramesDecoder"::toString, DEBUG);
    50 
    53 
    51     @FunctionalInterface
    54     @FunctionalInterface
    52     public interface FrameProcessor {
    55     public interface FrameProcessor {
    53         void processFrame(Http2Frame frame) throws IOException;
    56         void processFrame(Http2Frame frame) throws IOException;
    54     }
    57     }
    55 
    58 
    56     private final FrameProcessor frameProcessor;
    59     private final FrameProcessor frameProcessor;
    57     private final int maxFrameSize;
    60     private final int maxFrameSize;
    58 
    61 
    59     private ByteBufferReference currentBuffer; // current buffer either null or hasRemaining
    62     private ByteBuffer currentBuffer; // current buffer either null or hasRemaining
    60 
    63 
    61     private final java.util.Queue<ByteBufferReference> tailBuffers = new ArrayDeque<>();
    64     private final ArrayDeque<ByteBuffer> tailBuffers = new ArrayDeque<>();
    62     private int tailSize = 0;
    65     private int tailSize = 0;
    63 
    66 
    64     private boolean slicedToDataFrame = false;
    67     private boolean slicedToDataFrame = false;
    65 
    68 
    66     private final List<ByteBufferReference> prepareToRelease = new ArrayList<>();
    69     private final List<ByteBuffer> prepareToRelease = new ArrayList<>();
    67 
    70 
    68     // if true  - Frame Header was parsed (9 bytes consumed) and subsequent fields have meaning
    71     // if true  - Frame Header was parsed (9 bytes consumed) and subsequent fields have meaning
    69     // otherwise - stopped at frames boundary
    72     // otherwise - stopped at frames boundary
    70     private boolean frameHeaderParsed = false;
    73     private boolean frameHeaderParsed = false;
    71     private int frameLength;
    74     private int frameLength;
    72     private int frameType;
    75     private int frameType;
    73     private int frameFlags;
    76     private int frameFlags;
    74     private int frameStreamid;
    77     private int frameStreamid;
       
    78     private boolean closed;
    75 
    79 
    76     /**
    80     /**
    77      * Creates Frame Decoder
    81      * Creates Frame Decoder
    78      *
    82      *
    79      * @param frameProcessor - callback for decoded frames
    83      * @param frameProcessor - callback for decoded frames
    90     public FramesDecoder(FrameProcessor frameProcessor, int maxFrameSize) {
    94     public FramesDecoder(FrameProcessor frameProcessor, int maxFrameSize) {
    91         this.frameProcessor = frameProcessor;
    95         this.frameProcessor = frameProcessor;
    92         this.maxFrameSize = Math.min(Math.max(16 * 1024, maxFrameSize), 16 * 1024 * 1024 - 1);
    96         this.maxFrameSize = Math.min(Math.max(16 * 1024, maxFrameSize), 16 * 1024 * 1024 - 1);
    93     }
    97     }
    94 
    98 
       
    99     /** Threshold beyond which data is no longer copied into the current buffer,
       
   100      * if that buffer has enough unused space. */
       
   101     private static final int COPY_THRESHOLD = 8192;
       
   102 
    95     /**
   103     /**
    96      * put next buffer into queue,
   104      * Adds the data from the given buffer, and performs frame decoding if
    97      * if frame decoding is possible - decode all buffers and invoke FrameProcessor
   105      * possible.   Either 1) appends the data from the given buffer to the
       
   106      * current buffer ( if there is enough unused space ), or 2) adds it to the
       
   107      * next buffer in the queue.
    98      *
   108      *
    99      * @param buffer
   109      * If there is enough data to perform frame decoding then, all buffers are
   100      * @throws IOException
   110      * decoded and the FrameProcessor is invoked.
   101      */
   111      */
   102     public void decode(ByteBufferReference buffer) throws IOException {
   112     public void decode(ByteBuffer inBoundBuffer) throws IOException {
   103         int remaining = buffer.get().remaining();
   113         if (closed) {
       
   114             DEBUG_LOGGER.log(Level.DEBUG, "closed: ignoring buffer (%s bytes)",
       
   115                     inBoundBuffer.remaining());
       
   116             inBoundBuffer.position(inBoundBuffer.limit());
       
   117             return;
       
   118         }
       
   119         int remaining = inBoundBuffer.remaining();
       
   120         DEBUG_LOGGER.log(Level.DEBUG, "decodes: %d", remaining);
   104         if (remaining > 0) {
   121         if (remaining > 0) {
   105             if (currentBuffer == null) {
   122             if (currentBuffer == null) {
   106                 currentBuffer = buffer;
   123                 currentBuffer = inBoundBuffer;
   107             } else {
   124             } else {
   108                 tailBuffers.add(buffer);
   125                 ByteBuffer b = currentBuffer;
   109                 tailSize += remaining;
   126                 if (!tailBuffers.isEmpty()) {
   110             }
   127                     b = tailBuffers.getLast();
   111         }
   128                 }
       
   129 
       
   130                 int limit = b.limit();
       
   131                 int freeSpace = b.capacity() - limit;
       
   132                 if (remaining <= COPY_THRESHOLD && freeSpace >= remaining) {
       
   133                     // append the new data to the unused space in the current buffer
       
   134                     int position = b.position();
       
   135                     b.position(limit);
       
   136                     b.limit(limit + inBoundBuffer.remaining());
       
   137                     b.put(inBoundBuffer);
       
   138                     b.position(position);
       
   139                     if (b != currentBuffer)
       
   140                         tailSize += remaining;
       
   141                     DEBUG_LOGGER.log(Level.DEBUG, "copied: %d", remaining);
       
   142                 } else {
       
   143                     DEBUG_LOGGER.log(Level.DEBUG, "added: %d", remaining);
       
   144                     tailBuffers.add(inBoundBuffer);
       
   145                     tailSize += remaining;
       
   146                 }
       
   147             }
       
   148         }
       
   149         DEBUG_LOGGER.log(Level.DEBUG, "Tail size is now: %d, current=",
       
   150                 tailSize,
       
   151                 (currentBuffer == null ? 0 :
       
   152                    currentBuffer.remaining()));
   112         Http2Frame frame;
   153         Http2Frame frame;
   113         while ((frame = nextFrame()) != null) {
   154         while ((frame = nextFrame()) != null) {
       
   155             DEBUG_LOGGER.log(Level.DEBUG, "Got frame: %s", frame);
   114             frameProcessor.processFrame(frame);
   156             frameProcessor.processFrame(frame);
   115             frameProcessed();
   157             frameProcessed();
   116         }
   158         }
   117     }
   159     }
   118 
   160 
   119     private Http2Frame nextFrame() throws IOException {
   161     private Http2Frame nextFrame() throws IOException {
   120         while (true) {
   162         while (true) {
   121             if (currentBuffer == null) {
   163             if (currentBuffer == null) {
   122                 return null; // no data at all
   164                 return null; // no data at all
   123             }
   165             }
       
   166             long available = currentBuffer.remaining() + tailSize;
   124             if (!frameHeaderParsed) {
   167             if (!frameHeaderParsed) {
   125                 if (currentBuffer.get().remaining() + tailSize >= Http2Frame.FRAME_HEADER_SIZE) {
   168                 if (available >= Http2Frame.FRAME_HEADER_SIZE) {
   126                     parseFrameHeader();
   169                     parseFrameHeader();
   127                     if (frameLength > maxFrameSize) {
   170                     if (frameLength > maxFrameSize) {
   128                         // connection error
   171                         // connection error
   129                         return new MalformedFrame(ErrorFrame.FRAME_SIZE_ERROR,
   172                         return new MalformedFrame(ErrorFrame.FRAME_SIZE_ERROR,
   130                                 "Frame type("+frameType+") " +"length("+frameLength+") exceeds MAX_FRAME_SIZE("+ maxFrameSize+")");
   173                                 "Frame type("+frameType+") "
       
   174                                 +"length("+frameLength
       
   175                                 +") exceeds MAX_FRAME_SIZE("
       
   176                                 + maxFrameSize+")");
   131                     }
   177                     }
   132                     frameHeaderParsed = true;
   178                     frameHeaderParsed = true;
   133                 } else {
   179                 } else {
   134                     return null; // no data for frame header
   180                     DEBUG_LOGGER.log(Level.DEBUG,
       
   181                             "Not enough data to parse header, needs: %d, has: %d",
       
   182                             Http2Frame.FRAME_HEADER_SIZE, available);
       
   183                     return null;
   135                 }
   184                 }
   136             }
   185             }
       
   186             available = currentBuffer == null ? 0 : currentBuffer.remaining() + tailSize;
   137             if ((frameLength == 0) ||
   187             if ((frameLength == 0) ||
   138                     (currentBuffer != null && currentBuffer.get().remaining() + tailSize >= frameLength)) {
   188                     (currentBuffer != null && available >= frameLength)) {
   139                 Http2Frame frame = parseFrameBody();
   189                 Http2Frame frame = parseFrameBody();
   140                 frameHeaderParsed = false;
   190                 frameHeaderParsed = false;
   141                 // frame == null means we have to skip this frame and try parse next
   191                 // frame == null means we have to skip this frame and try parse next
   142                 if (frame != null) {
   192                 if (frame != null) {
   143                     return frame;
   193                     return frame;
   144                 }
   194                 }
   145             } else {
   195             } else {
       
   196                 DEBUG_LOGGER.log(Level.DEBUG,
       
   197                         "Not enough data to parse frame body, needs: %d,  has: %d",
       
   198                         frameLength, available);
   146                 return null;  // no data for the whole frame header
   199                 return null;  // no data for the whole frame header
   147             }
   200             }
   148         }
   201         }
   149     }
   202     }
   150 
   203 
   151     private void frameProcessed() {
   204     private void frameProcessed() {
   152         prepareToRelease.forEach(ByteBufferReference::clear);
       
   153         prepareToRelease.clear();
   205         prepareToRelease.clear();
   154     }
   206     }
   155 
   207 
   156     private void parseFrameHeader() throws IOException {
   208     private void parseFrameHeader() throws IOException {
   157         int x = getInt();
   209         int x = getInt();
   158         this.frameLength = x >> 8;
   210         this.frameLength = (x >>> 8) & 0x00ffffff;
   159         this.frameType = x & 0xff;
   211         this.frameType = x & 0xff;
   160         this.frameFlags = getByte();
   212         this.frameFlags = getByte();
   161         this.frameStreamid = getInt() & 0x7fffffff;
   213         this.frameStreamid = getInt() & 0x7fffffff;
   162         // R: A reserved 1-bit field.  The semantics of this bit are undefined,
   214         // R: A reserved 1-bit field.  The semantics of this bit are undefined,
   163         // MUST be ignored when receiving.
   215         // MUST be ignored when receiving.
   164     }
   216     }
   165 
   217 
   166     // move next buffer from tailBuffers to currentBuffer if required
   218     // move next buffer from tailBuffers to currentBuffer if required
   167     private void nextBuffer() {
   219     private void nextBuffer() {
   168         if (!currentBuffer.get().hasRemaining()) {
   220         if (!currentBuffer.hasRemaining()) {
   169             if (!slicedToDataFrame) {
   221             if (!slicedToDataFrame) {
   170                 prepareToRelease.add(currentBuffer);
   222                 prepareToRelease.add(currentBuffer);
   171             }
   223             }
   172             slicedToDataFrame = false;
   224             slicedToDataFrame = false;
   173             currentBuffer = tailBuffers.poll();
   225             currentBuffer = tailBuffers.poll();
   174             if (currentBuffer != null) {
   226             if (currentBuffer != null) {
   175                 tailSize -= currentBuffer.get().remaining();
   227                 tailSize -= currentBuffer.remaining();
   176             }
   228             }
   177         }
   229         }
   178     }
   230     }
   179 
   231 
   180     public int getByte() {
   232     public int getByte() {
   181         ByteBuffer buf = currentBuffer.get();
   233         int res = currentBuffer.get() & 0xff;
   182         int res = buf.get() & 0xff;
       
   183         nextBuffer();
   234         nextBuffer();
   184         return res;
   235         return res;
   185     }
   236     }
   186 
   237 
   187     public int getShort() {
   238     public int getShort() {
   188         ByteBuffer buf = currentBuffer.get();
   239         if (currentBuffer.remaining() >= 2) {
   189         if (buf.remaining() >= 2) {
   240             int res = currentBuffer.getShort() & 0xffff;
   190             int res = buf.getShort() & 0xffff;
       
   191             nextBuffer();
   241             nextBuffer();
   192             return res;
   242             return res;
   193         }
   243         }
   194         int val = getByte();
   244         int val = getByte();
   195         val = (val << 8) + getByte();
   245         val = (val << 8) + getByte();
   196         return val;
   246         return val;
   197     }
   247     }
   198 
   248 
   199     public int getInt() {
   249     public int getInt() {
   200         ByteBuffer buf = currentBuffer.get();
   250         if (currentBuffer.remaining() >= 4) {
   201         if (buf.remaining() >= 4) {
   251             int res = currentBuffer.getInt();
   202             int res = buf.getInt();
       
   203             nextBuffer();
   252             nextBuffer();
   204             return res;
   253             return res;
   205         }
   254         }
   206         int val = getByte();
   255         int val = getByte();
   207         val = (val << 8) + getByte();
   256         val = (val << 8) + getByte();
   213 
   262 
   214     public byte[] getBytes(int n) {
   263     public byte[] getBytes(int n) {
   215         byte[] bytes = new byte[n];
   264         byte[] bytes = new byte[n];
   216         int offset = 0;
   265         int offset = 0;
   217         while (n > 0) {
   266         while (n > 0) {
   218             ByteBuffer buf = currentBuffer.get();
   267             int length = Math.min(n, currentBuffer.remaining());
   219             int length = Math.min(n, buf.remaining());
   268             currentBuffer.get(bytes, offset, length);
   220             buf.get(bytes, offset, length);
       
   221             offset += length;
   269             offset += length;
   222             n -= length;
   270             n -= length;
   223             nextBuffer();
   271             nextBuffer();
   224         }
   272         }
   225         return bytes;
   273         return bytes;
   226 
   274 
   227     }
   275     }
   228 
   276 
   229     private ByteBufferReference[] getBuffers(boolean isDataFrame, int bytecount) {
   277     private List<ByteBuffer> getBuffers(boolean isDataFrame, int bytecount) {
   230         List<ByteBufferReference> res = new ArrayList<>();
   278         List<ByteBuffer> res = new ArrayList<>();
   231         while (bytecount > 0) {
   279         while (bytecount > 0) {
   232             ByteBuffer buf = currentBuffer.get();
   280             int remaining = currentBuffer.remaining();
   233             int remaining = buf.remaining();
       
   234             int extract = Math.min(remaining, bytecount);
   281             int extract = Math.min(remaining, bytecount);
   235             ByteBuffer extractedBuf;
   282             ByteBuffer extractedBuf;
   236             if (isDataFrame) {
   283             if (isDataFrame) {
   237                 extractedBuf = Utils.slice(buf, extract);
   284                 extractedBuf = Utils.slice(currentBuffer, extract);
   238                 slicedToDataFrame = true;
   285                 slicedToDataFrame = true;
   239             } else {
   286             } else {
   240                 // Header frames here
   287                 // Header frames here
   241                 // HPACK decoding should performed under lock and immediately after frame decoding.
   288                 // HPACK decoding should performed under lock and immediately after frame decoding.
   242                 // in that case it is safe to release original buffer,
   289                 // in that case it is safe to release original buffer,
   243                 // because of sliced buffer has a very short life
   290                 // because of sliced buffer has a very short life
   244                 extractedBuf = Utils.slice(buf, extract);
   291                 extractedBuf = Utils.slice(currentBuffer, extract);
   245             }
   292             }
   246             res.add(ByteBufferReference.of(extractedBuf));
   293             res.add(extractedBuf);
   247             bytecount -= extract;
   294             bytecount -= extract;
   248             nextBuffer();
   295             nextBuffer();
   249         }
   296         }
   250         return res.toArray(new ByteBufferReference[0]);
   297         return res;
       
   298     }
       
   299 
       
   300     public void close(String msg) {
       
   301         closed = true;
       
   302         tailBuffers.clear();
       
   303         int bytes = tailSize;
       
   304         ByteBuffer b = currentBuffer;
       
   305         if (b != null) {
       
   306             bytes += b.remaining();
       
   307             b.position(b.limit());
       
   308         }
       
   309         tailSize = 0;
       
   310         currentBuffer = null;
       
   311         DEBUG_LOGGER.log(Level.DEBUG, "closed %s, ignoring %d bytes", msg, bytes);
   251     }
   312     }
   252 
   313 
   253     public void skipBytes(int bytecount) {
   314     public void skipBytes(int bytecount) {
   254         while (bytecount > 0) {
   315         while (bytecount > 0) {
   255             ByteBuffer buf = currentBuffer.get();
   316             int remaining = currentBuffer.remaining();
   256             int remaining = buf.remaining();
       
   257             int extract = Math.min(remaining, bytecount);
   317             int extract = Math.min(remaining, bytecount);
   258             buf.position(buf.position() + extract);
   318             currentBuffer.position(currentBuffer.position() + extract);
   259             bytecount -= remaining;
   319             bytecount -= remaining;
   260             nextBuffer();
   320             nextBuffer();
   261         }
   321         }
   262     }
   322     }
   263 
   323 
   294     }
   354     }
   295 
   355 
   296     private Http2Frame parseDataFrame(int frameLength, int streamid, int flags) {
   356     private Http2Frame parseDataFrame(int frameLength, int streamid, int flags) {
   297         // non-zero stream
   357         // non-zero stream
   298         if (streamid == 0) {
   358         if (streamid == 0) {
   299             return new MalformedFrame(ErrorFrame.PROTOCOL_ERROR, "zero streamId for DataFrame");
   359             return new MalformedFrame(ErrorFrame.PROTOCOL_ERROR,
       
   360                                       "zero streamId for DataFrame");
   300         }
   361         }
   301         int padLength = 0;
   362         int padLength = 0;
   302         if ((flags & DataFrame.PADDED) != 0) {
   363         if ((flags & DataFrame.PADDED) != 0) {
   303             padLength = getByte();
   364             padLength = getByte();
   304             if(padLength >= frameLength) {
   365             if (padLength >= frameLength) {
   305                 return new MalformedFrame(ErrorFrame.PROTOCOL_ERROR,
   366                 return new MalformedFrame(ErrorFrame.PROTOCOL_ERROR,
   306                         "the length of the padding is the length of the frame payload or greater");
   367                         "the length of the padding is the length of the frame payload or greater");
   307             }
   368             }
   308             frameLength--;
   369             frameLength--;
   309         }
   370         }
   315     }
   376     }
   316 
   377 
   317     private Http2Frame parseHeadersFrame(int frameLength, int streamid, int flags) {
   378     private Http2Frame parseHeadersFrame(int frameLength, int streamid, int flags) {
   318         // non-zero stream
   379         // non-zero stream
   319         if (streamid == 0) {
   380         if (streamid == 0) {
   320             return new MalformedFrame(ErrorFrame.PROTOCOL_ERROR, "zero streamId for HeadersFrame");
   381             return new MalformedFrame(ErrorFrame.PROTOCOL_ERROR,
       
   382                                       "zero streamId for HeadersFrame");
   321         }
   383         }
   322         int padLength = 0;
   384         int padLength = 0;
   323         if ((flags & HeadersFrame.PADDED) != 0) {
   385         if ((flags & HeadersFrame.PADDED) != 0) {
   324             padLength = getByte();
   386             padLength = getByte();
   325             frameLength--;
   387             frameLength--;