57 final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); |
57 final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); |
58 static final AtomicLong IDS = new AtomicLong(); |
58 static final AtomicLong IDS = new AtomicLong(); |
59 |
59 |
60 private final HttpClientImpl client; |
60 private final HttpClientImpl client; |
61 private final SocketChannel channel; |
61 private final SocketChannel channel; |
62 private final Supplier<ByteBuffer> buffersSource; |
62 private final SliceBufferSource sliceBuffersSource; |
63 private final Object lock = new Object(); |
63 private final Object lock = new Object(); |
64 private final AtomicReference<Throwable> errorRef = new AtomicReference<>(); |
64 private final AtomicReference<Throwable> errorRef = new AtomicReference<>(); |
65 private final InternalReadPublisher readPublisher; |
65 private final InternalReadPublisher readPublisher; |
66 private final InternalWriteSubscriber writeSubscriber; |
66 private final InternalWriteSubscriber writeSubscriber; |
67 private final long id = IDS.incrementAndGet(); |
67 private final long id = IDS.incrementAndGet(); |
68 |
68 |
69 public SocketTube(HttpClientImpl client, SocketChannel channel, |
69 public SocketTube(HttpClientImpl client, SocketChannel channel, |
70 Supplier<ByteBuffer> buffersSource) { |
70 Supplier<ByteBuffer> buffersFactory) { |
71 this.client = client; |
71 this.client = client; |
72 this.channel = channel; |
72 this.channel = channel; |
73 this.buffersSource = buffersSource; |
73 this.sliceBuffersSource = new SliceBufferSource(buffersFactory); |
|
74 |
74 this.readPublisher = new InternalReadPublisher(); |
75 this.readPublisher = new InternalReadPublisher(); |
75 this.writeSubscriber = new InternalWriteSubscriber(); |
76 this.writeSubscriber = new InternalWriteSubscriber(); |
76 } |
77 } |
77 |
78 |
78 /** |
79 /** |
904 Logger debug() { return debug; } |
909 Logger debug() { return debug; } |
905 } |
910 } |
906 } |
911 } |
907 |
912 |
908 // ===================================================================== // |
913 // ===================================================================== // |
|
914 // Buffer Management // |
|
915 // ===================================================================== // |
|
916 |
|
917 // This interface is used by readAvailable(BufferSource); |
|
918 public interface BufferSource { |
|
919 /** |
|
920 * Returns a buffer to read data from the socket. |
|
921 * |
|
922 * @implNote |
|
923 * Different implementation can have different strategies, as to |
|
924 * which kind of buffer to return, or whether to return the same |
|
925 * buffer. The only constraints are that: |
|
926 * a. the buffer returned must not be null |
|
927 * b. the buffer position indicates where to start reading |
|
928 * c. the buffer limit indicates where to stop reading. |
|
929 * d. the buffer is 'free' - that is - it is not used |
|
930 * or retained by anybody else |
|
931 * |
|
932 * @return A buffer to read data from the socket. |
|
933 */ |
|
934 ByteBuffer getBuffer(); |
|
935 |
|
936 /** |
|
937 * Appends the read-data in {@code buffer} to the list of buffer to |
|
938 * be sent downstream to the subscriber. May return a new |
|
939 * list, or append to the given list. |
|
940 * |
|
941 * @implNote |
|
942 * Different implementation can have different strategies, but |
|
943 * must obviously be consistent with the implementation of the |
|
944 * getBuffer() method. For instance, an implementation could |
|
945 * decide to add the buffer to the list and return a new buffer |
|
946 * next time getBuffer() is called, or could decide to add a buffer |
|
947 * slice to the list and return the same buffer (if remaining |
|
948 * space is available) next time getBuffer() is called. |
|
949 * |
|
950 * @param list The list before adding the data. Can be null. |
|
951 * @param buffer The buffer containing the data to add to the list. |
|
952 * @param start The start position at which data were read. |
|
953 * The current buffer position indicates the end. |
|
954 * @return A possibly new list where a buffer containing the |
|
955 * data read from the socket has been added. |
|
956 */ |
|
957 List<ByteBuffer> append(List<ByteBuffer> list, ByteBuffer buffer, int start); |
|
958 |
|
959 /** |
|
960 * Returns the given unused {@code buffer}, previously obtained from |
|
961 * {@code getBuffer}. |
|
962 * |
|
963 * @implNote This method can be used, if necessary, to return |
|
964 * the unused buffer to the pull. |
|
965 * |
|
966 * @param buffer The unused buffer. |
|
967 */ |
|
968 default void returnUnused(ByteBuffer buffer) { } |
|
969 } |
|
970 |
|
971 // An implementation of BufferSource used for unencrypted data. |
|
972 // This buffer source uses heap buffers and avoids wasting memory |
|
973 // by forwarding read-only buffer slices downstream. |
|
974 // Buffers allocated through this source are simply GC'ed when |
|
975 // they are no longer referenced. |
|
976 private static final class SliceBufferSource implements BufferSource { |
|
977 private final Supplier<ByteBuffer> factory; |
|
978 private volatile ByteBuffer current; |
|
979 |
|
980 public SliceBufferSource() { |
|
981 this(Utils::getBuffer); |
|
982 } |
|
983 public SliceBufferSource(Supplier<ByteBuffer> factory) { |
|
984 this.factory = Objects.requireNonNull(factory); |
|
985 } |
|
986 |
|
987 // Reuses the same buffer if some space remains available. |
|
988 // Otherwise, returns a new heap buffer. |
|
989 @Override |
|
990 public final ByteBuffer getBuffer() { |
|
991 ByteBuffer buf = current; |
|
992 buf = (buf == null || !buf.hasRemaining()) |
|
993 ? (current = factory.get()) : buf; |
|
994 assert buf.hasRemaining(); |
|
995 return buf; |
|
996 } |
|
997 |
|
998 // Adds a read-only slice to the list, potentially returning a |
|
999 // new list with that slice at the end. |
|
1000 @Override |
|
1001 public final List<ByteBuffer> append(List <ByteBuffer> list, ByteBuffer buf, int start) { |
|
1002 // creates a slice to add to the list |
|
1003 int limit = buf.limit(); |
|
1004 buf.limit(buf.position()); |
|
1005 buf.position(start); |
|
1006 ByteBuffer slice = buf.slice(); |
|
1007 |
|
1008 // restore buffer state to what it was before creating the slice |
|
1009 buf.position(buf.limit()); |
|
1010 buf.limit(limit); |
|
1011 |
|
1012 // add the buffer to the list |
|
1013 return SocketTube.listOf(list, slice.asReadOnlyBuffer()); |
|
1014 } |
|
1015 } |
|
1016 |
|
1017 |
|
1018 // An implementation of BufferSource used for encrypted data. |
|
1019 // This buffer source uses direct byte buffers that will be |
|
1020 // recycled by the SocketTube subscriber. |
|
1021 // |
|
1022 private static final class SSLDirectBufferSource implements BufferSource { |
|
1023 private final BufferSupplier factory; |
|
1024 private final HttpClientImpl client; |
|
1025 private ByteBuffer current; |
|
1026 |
|
1027 public SSLDirectBufferSource(HttpClientImpl client) { |
|
1028 this.client = Objects.requireNonNull(client); |
|
1029 this.factory = Objects.requireNonNull(client.getSSLBufferSupplier()); |
|
1030 } |
|
1031 |
|
1032 // Obtains a 'free' byte buffer from the pool, or returns |
|
1033 // the same buffer if nothing was read at the previous cycle. |
|
1034 // The subscriber will be responsible for recycling this |
|
1035 // buffer into the pool (see SSLFlowDelegate.Reader) |
|
1036 @Override |
|
1037 public final ByteBuffer getBuffer() { |
|
1038 assert client.isSelectorThread(); |
|
1039 ByteBuffer buf = current; |
|
1040 if (buf == null) { |
|
1041 buf = current = factory.get(); |
|
1042 } |
|
1043 assert buf.hasRemaining(); |
|
1044 assert buf.position() == 0; |
|
1045 return buf; |
|
1046 } |
|
1047 |
|
1048 // Adds the buffer to the list. The buffer will be later returned to the |
|
1049 // pool by the subscriber (see SSLFlowDelegate.Reader). |
|
1050 // The next buffer returned by getBuffer() will be obtained from the |
|
1051 // pool. It might be the same buffer or another one. |
|
1052 // Because socket tube can read up to MAX_BUFFERS = 3 buffers, and because |
|
1053 // recycling will happen in the flow before onNext returns, then the |
|
1054 // pool can not grow larger than MAX_BUFFERS = 3 buffers, even though |
|
1055 // it's shared by all SSL connections opened on that client. |
|
1056 @Override |
|
1057 public final List<ByteBuffer> append(List <ByteBuffer> list, ByteBuffer buf, int start) { |
|
1058 assert client.isSelectorThread(); |
|
1059 assert buf.isDirect(); |
|
1060 assert start == 0; |
|
1061 assert current == buf; |
|
1062 current = null; |
|
1063 buf.limit(buf.position()); |
|
1064 buf.position(start); |
|
1065 // add the buffer to the list |
|
1066 return SocketTube.listOf(list, buf); |
|
1067 } |
|
1068 |
|
1069 @Override |
|
1070 public void returnUnused(ByteBuffer buffer) { |
|
1071 // if current is null, then the buffer will have been added to the |
|
1072 // list, through append. Otherwise, current is not null, and needs |
|
1073 // to be returned to prevent the buffer supplier pool from growing |
|
1074 // to more than MAX_BUFFERS. |
|
1075 assert buffer == current; |
|
1076 ByteBuffer buf = current; |
|
1077 if (buf != null) { |
|
1078 assert buf.position() == 0; |
|
1079 current = null; |
|
1080 // the supplier assert if buf has remaining |
|
1081 buf.limit(buf.position()); |
|
1082 factory.recycle(buf); |
|
1083 } |
|
1084 } |
|
1085 } |
|
1086 |
|
1087 // ===================================================================== // |
909 // Socket Channel Read/Write // |
1088 // Socket Channel Read/Write // |
910 // ===================================================================== // |
1089 // ===================================================================== // |
911 static final int MAX_BUFFERS = 3; |
1090 static final int MAX_BUFFERS = 3; |
912 static final List<ByteBuffer> EOF = List.of(); |
1091 static final List<ByteBuffer> EOF = List.of(); |
913 static final List<ByteBuffer> NOTHING = List.of(Utils.EMPTY_BYTEBUFFER); |
1092 static final List<ByteBuffer> NOTHING = List.of(Utils.EMPTY_BYTEBUFFER); |
949 if (buf.position() == pos) { |
1128 if (buf.position() == pos) { |
950 // An empty list signals the end of data, and should only be |
1129 // An empty list signals the end of data, and should only be |
951 // returned if read == -1. If some data has already been read, |
1130 // returned if read == -1. If some data has already been read, |
952 // then it must be returned. -1 will be returned next time |
1131 // then it must be returned. -1 will be returned next time |
953 // the caller attempts to read something. |
1132 // the caller attempts to read something. |
|
1133 buffersSource.returnUnused(buf); |
954 if (list == null) { |
1134 if (list == null) { |
955 // nothing read - list was null - return EOF or NOTHING |
1135 // nothing read - list was null - return EOF or NOTHING |
956 list = read == -1 ? EOF : NOTHING; |
1136 list = read == -1 ? EOF : NOTHING; |
957 } |
1137 } |
958 break; |
1138 break; |
959 } |
1139 } |
960 |
1140 |
961 // check whether this buffer has still some free space available. |
1141 // check whether this buffer has still some free space available. |
962 // if so, we will keep it for the next round. |
1142 // if so, we will keep it for the next round. |
963 final boolean hasRemaining = buf.hasRemaining(); |
1143 list = buffersSource.append(list, buf, pos); |
964 |
1144 |
965 // creates a slice to add to the list |
|
966 int limit = buf.limit(); |
|
967 buf.limit(buf.position()); |
|
968 buf.position(pos); |
|
969 ByteBuffer slice = buf.slice(); |
|
970 |
|
971 // restore buffer state to what it was before creating the slice |
|
972 buf.position(buf.limit()); |
|
973 buf.limit(limit); |
|
974 |
|
975 // add the buffer to the list |
|
976 list = addToList(list, slice.asReadOnlyBuffer()); |
|
977 if (read <= 0 || list.size() == MAX_BUFFERS) { |
1145 if (read <= 0 || list.size() == MAX_BUFFERS) { |
978 break; |
1146 break; |
979 } |
1147 } |
980 |
1148 |
981 buf = hasRemaining ? buf : (current = buffersSource.get()); |
1149 buf = buffersSource.getBuffer(); |
982 pos = buf.position(); |
1150 pos = buf.position(); |
983 assert buf.hasRemaining(); |
1151 assert buf.hasRemaining(); |
984 } |
1152 } |
985 return list; |
1153 return list; |
986 } |
1154 } |
987 |
1155 |
988 private <T> List<T> addToList(List<T> list, T item) { |
1156 private static <T> List<T> listOf(List<T> list, T item) { |
989 int size = list == null ? 0 : list.size(); |
1157 int size = list == null ? 0 : list.size(); |
990 switch (size) { |
1158 switch (size) { |
991 case 0: return List.of(item); |
1159 case 0: return List.of(item); |
992 case 1: return List.of(list.get(0), item); |
1160 case 1: return List.of(list.get(0), item); |
993 case 2: return List.of(list.get(0), list.get(1), item); |
1161 case 2: return List.of(list.get(0), list.get(1), item); |
994 default: // slow path if MAX_BUFFERS > 3 |
1162 default: // slow path if MAX_BUFFERS > 3 |
995 ArrayList<T> res = new ArrayList<>(list); |
1163 List<T> res = list instanceof ArrayList ? list : new ArrayList<>(list); |
996 res.add(item); |
1164 res.add(item); |
997 return res; |
1165 return res; |
998 } |
1166 } |
999 } |
1167 } |
1000 |
1168 |