899 // ===================================================================== // |
899 // ===================================================================== // |
900 // Socket Channel Read/Write // |
900 // Socket Channel Read/Write // |
901 // ===================================================================== // |
901 // ===================================================================== // |
902 static final int MAX_BUFFERS = 3; |
902 static final int MAX_BUFFERS = 3; |
903 static final List<ByteBuffer> EOF = List.of(); |
903 static final List<ByteBuffer> EOF = List.of(); |
904 |
904 static final List<ByteBuffer> NOTHING = List.of(Utils.EMPTY_BYTEBUFFER); |
|
905 |
|
906 // readAvailable() will read bytes into the 'current' ByteBuffer until |
|
907 // the ByteBuffer is full, or 0 or -1 (EOF) is returned by read(). |
|
908 // When that happens, a slice of the data that has been read so far |
|
909 // is inserted into the returned buffer list, and if the current buffer |
|
910 // has remaining space, that space will be used to read more data when |
|
911 // the channel becomes readable again. |
|
912 private volatile ByteBuffer current; |
905 private List<ByteBuffer> readAvailable() throws IOException { |
913 private List<ByteBuffer> readAvailable() throws IOException { |
906 ByteBuffer buf = buffersSource.get(); |
914 ByteBuffer buf = current; |
|
915 buf = (buf == null || !buf.hasRemaining()) |
|
916 ? (current = buffersSource.get()) : buf; |
907 assert buf.hasRemaining(); |
917 assert buf.hasRemaining(); |
908 |
918 |
909 int read; |
919 int read; |
910 int pos = buf.position(); |
920 int pos = buf.position(); |
911 List<ByteBuffer> list = null; |
921 List<ByteBuffer> list = null; |
914 while ((read = channel.read(buf)) > 0) { |
924 while ((read = channel.read(buf)) > 0) { |
915 if (!buf.hasRemaining()) |
925 if (!buf.hasRemaining()) |
916 break; |
926 break; |
917 } |
927 } |
918 } catch (IOException x) { |
928 } catch (IOException x) { |
919 if (buf.position() == pos && (list == null || list.isEmpty())) { |
929 if (buf.position() == pos && list == null) { |
920 // no bytes have been read, just throw... |
930 // no bytes have been read, just throw... |
921 throw x; |
931 throw x; |
922 } else { |
932 } else { |
923 // some bytes have been read, return them and fail next time |
933 // some bytes have been read, return them and fail next time |
924 errorRef.compareAndSet(null, x); |
934 errorRef.compareAndSet(null, x); |
930 if (buf.position() == pos) { |
940 if (buf.position() == pos) { |
931 // An empty list signals the end of data, and should only be |
941 // An empty list signals the end of data, and should only be |
932 // returned if read == -1. If some data has already been read, |
942 // returned if read == -1. If some data has already been read, |
933 // then it must be returned. -1 will be returned next time |
943 // then it must be returned. -1 will be returned next time |
934 // the caller attempts to read something. |
944 // the caller attempts to read something. |
935 if (list == null && read == -1) { // eof |
945 if (list == null) { |
936 list = EOF; |
946 // nothing read - list was null - return EOF or NOTHING |
937 break; |
947 list = read == -1 ? EOF : NOTHING; |
938 } |
948 } |
939 } |
949 break; |
|
950 } |
|
951 |
|
952 // check whether this buffer has still some free space available. |
|
953 // if so, we will keep it for the next round. |
|
954 final boolean hasRemaining = buf.hasRemaining(); |
|
955 |
|
956 // creates a slice to add to the list |
|
957 int limit = buf.limit(); |
940 buf.limit(buf.position()); |
958 buf.limit(buf.position()); |
941 buf.position(pos); |
959 buf.position(pos); |
942 if (list == null) { |
960 ByteBuffer slice = buf.slice(); |
943 list = List.of(buf); |
961 |
944 } else { |
962 // restore buffer state to what it was before creating the slice |
945 if (!(list instanceof ArrayList)) { |
963 buf.position(buf.limit()); |
946 list = new ArrayList<>(list); |
964 buf.limit(limit); |
947 } |
965 |
948 list.add(buf); |
966 // add the buffer to the list |
949 } |
967 list = addToList(list, slice.asReadOnlyBuffer()); |
950 if (read <= 0 || list.size() == MAX_BUFFERS) { |
968 if (read <= 0 || list.size() == MAX_BUFFERS) { |
951 break; |
969 break; |
952 } |
970 } |
953 |
971 |
954 buf = buffersSource.get(); |
972 buf = hasRemaining ? buf : (current = buffersSource.get()); |
955 pos = buf.position(); |
973 pos = buf.position(); |
956 assert buf.hasRemaining(); |
974 assert buf.hasRemaining(); |
957 } |
975 } |
958 return list; |
976 return list; |
|
977 } |
|
978 |
|
979 private <T> List<T> addToList(List<T> list, T item) { |
|
980 int size = list == null ? 0 : list.size(); |
|
981 switch (size) { |
|
982 case 0: return List.of(item); |
|
983 case 1: return List.of(list.get(0), item); |
|
984 case 2: return List.of(list.get(0), list.get(1), item); |
|
985 default: // slow path if MAX_BUFFERS > 3 |
|
986 ArrayList<T> res = new ArrayList<>(list); |
|
987 res.add(item); |
|
988 return res; |
|
989 } |
959 } |
990 } |
960 |
991 |
961 private long writeAvailable(List<ByteBuffer> bytes) throws IOException { |
992 private long writeAvailable(List<ByteBuffer> bytes) throws IOException { |
962 ByteBuffer[] srcs = bytes.toArray(Utils.EMPTY_BB_ARRAY); |
993 ByteBuffer[] srcs = bytes.toArray(Utils.EMPTY_BB_ARRAY); |
963 final long remaining = Utils.remaining(srcs); |
994 final long remaining = Utils.remaining(srcs); |