57 final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); |
57 final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); |
58 static final AtomicLong IDS = new AtomicLong(); |
58 static final AtomicLong IDS = new AtomicLong(); |
59 |
59 |
60 private final HttpClientImpl client; |
60 private final HttpClientImpl client; |
61 private final SocketChannel channel; |
61 private final SocketChannel channel; |
62 private final Supplier<ByteBuffer> buffersSource; |
62 private final SliceBufferSource sliceBuffersSource; |
63 private final Object lock = new Object(); |
63 private final Object lock = new Object(); |
64 private final AtomicReference<Throwable> errorRef = new AtomicReference<>(); |
64 private final AtomicReference<Throwable> errorRef = new AtomicReference<>(); |
65 private final InternalReadPublisher readPublisher; |
65 private final InternalReadPublisher readPublisher; |
66 private final InternalWriteSubscriber writeSubscriber; |
66 private final InternalWriteSubscriber writeSubscriber; |
67 private final long id = IDS.incrementAndGet(); |
67 private final long id = IDS.incrementAndGet(); |
68 |
68 |
69 public SocketTube(HttpClientImpl client, SocketChannel channel, |
69 public SocketTube(HttpClientImpl client, SocketChannel channel, |
70 Supplier<ByteBuffer> buffersSource) { |
70 Supplier<ByteBuffer> buffersFactory) { |
71 this.client = client; |
71 this.client = client; |
72 this.channel = channel; |
72 this.channel = channel; |
73 this.buffersSource = buffersSource; |
73 this.sliceBuffersSource = new SliceBufferSource(buffersFactory); |
|
74 |
74 this.readPublisher = new InternalReadPublisher(); |
75 this.readPublisher = new InternalReadPublisher(); |
75 this.writeSubscriber = new InternalWriteSubscriber(); |
76 this.writeSubscriber = new InternalWriteSubscriber(); |
76 } |
77 } |
77 |
78 |
78 /** |
79 /** |
562 |
563 |
563 final class ReadSubscription implements Flow.Subscription { |
564 final class ReadSubscription implements Flow.Subscription { |
564 final InternalReadSubscription impl; |
565 final InternalReadSubscription impl; |
565 final TubeSubscriber subscriber; |
566 final TubeSubscriber subscriber; |
566 final AtomicReference<Throwable> errorRef = new AtomicReference<>(); |
567 final AtomicReference<Throwable> errorRef = new AtomicReference<>(); |
|
568 final BufferSource bufferSource; |
567 volatile boolean subscribed; |
569 volatile boolean subscribed; |
568 volatile boolean cancelled; |
570 volatile boolean cancelled; |
569 volatile boolean completed; |
571 volatile boolean completed; |
570 |
572 |
571 public ReadSubscription(InternalReadSubscription impl, |
573 public ReadSubscription(InternalReadSubscription impl, |
572 TubeSubscriber subscriber) { |
574 TubeSubscriber subscriber) { |
573 this.impl = impl; |
575 this.impl = impl; |
|
576 this.bufferSource = subscriber.supportsRecycling() |
|
577 ? new SSLDirectBufferSource(client) |
|
578 : SocketTube.this.sliceBuffersSource; |
574 this.subscriber = subscriber; |
579 this.subscriber = subscriber; |
575 } |
580 } |
576 |
581 |
577 @Override |
582 @Override |
578 public void cancel() { |
583 public void cancel() { |
904 Logger debug() { return debug; } |
909 Logger debug() { return debug; } |
905 } |
910 } |
906 } |
911 } |
907 |
912 |
908 // ===================================================================== // |
913 // ===================================================================== // |
|
914 // Buffer Management // |
|
915 // ===================================================================== // |
|
916 |
|
917 // This interface is used by readAvailable(BufferSource); |
|
918 public interface BufferSource { |
|
919 /** |
|
920 * Returns a buffer to read data from the socket. |
|
921 * Different implementation can have different strategies, as to |
|
922 * which kind of buffer to return, or whether to return the same |
|
923 * buffer. The only constraints are that |
|
924 * a. the buffer returned must not be null |
|
925 * b. the buffer position indicates where to start reading |
|
926 * c. the buffer limit indicates where to stop reading. |
|
927 * d. the buffer is 'free' - that is - it is not used |
|
928 * or retained by anybody else |
|
929 * @return A buffer to read data from the socket. |
|
930 */ |
|
931 ByteBuffer getBuffer(); |
|
932 |
|
933 /** |
|
934 * Append the data read into the buffer to the list of buffer to |
|
935 * be sent downstream to the subscriber. May return a new |
|
936 * list, or append to the given list. |
|
937 * |
|
938 * Different implementation can have different strategies, but |
|
939 * must obviously be consistent with the implementation of the |
|
940 * getBuffer() method. For instance, an implementation could |
|
941 * decide to add the buffer to the list and return a new buffer |
|
942 * next time getBuffer() is called, or could decide to add a buffer |
|
943 * slice to the list and return the same buffer (if remaining |
|
944 * space is available) next time getBuffer() is called. |
|
945 * |
|
946 * @param list The list before adding the data. Can be null. |
|
947 * @param buffer The buffer containing the data to add to the list. |
|
948 * @param start The start position at which data were read. |
|
949 * The current buffer position indicates the end. |
|
950 * @return A possibly new list where a buffer containing the |
|
951 * data read from the socket has been added. |
|
952 */ |
|
953 List<ByteBuffer> append(List <ByteBuffer> list, ByteBuffer buffer, int start); |
|
954 } |
|
955 |
|
956 // An implementation of BufferSource used for unencrypted data. |
|
957 // This buffer source uses heap buffers and avoids wasting memory |
|
958 // by forwarding read only buffer slices downstream. |
|
959 // Buffers allocated through this source are simply GC'ed when |
|
960 // they are no longer referenced. |
|
961 static final class SliceBufferSource implements BufferSource { |
|
962 private final Supplier<ByteBuffer> factory; |
|
963 private volatile ByteBuffer current; |
|
964 public SliceBufferSource() { |
|
965 this(Utils::getBuffer); |
|
966 } |
|
967 public SliceBufferSource(Supplier<ByteBuffer> factory) { |
|
968 this.factory = Objects.requireNonNull(factory); |
|
969 } |
|
970 |
|
971 // reuse the same buffer if some space remains available. |
|
972 // otherwise, returns a new heap buffer. |
|
973 @Override |
|
974 public final ByteBuffer getBuffer() { |
|
975 ByteBuffer buf = current; |
|
976 buf = (buf == null || !buf.hasRemaining()) |
|
977 ? (current = factory.get()) : buf; |
|
978 assert buf.hasRemaining(); |
|
979 return buf; |
|
980 } |
|
981 |
|
982 // Adds a read only slice to the list, potentially returning a |
|
983 // new list with with that slice at the end. |
|
984 @Override |
|
985 public final List<ByteBuffer> append(List <ByteBuffer> list, ByteBuffer buf, int start) { |
|
986 // creates a slice to add to the list |
|
987 int limit = buf.limit(); |
|
988 buf.limit(buf.position()); |
|
989 buf.position(start); |
|
990 ByteBuffer slice = buf.slice(); |
|
991 |
|
992 // restore buffer state to what it was before creating the slice |
|
993 buf.position(buf.limit()); |
|
994 buf.limit(limit); |
|
995 |
|
996 // add the buffer to the list |
|
997 return SocketTube.listOf(list, slice.asReadOnlyBuffer()); |
|
998 } |
|
999 } |
|
1000 |
|
1001 |
|
1002 // An implementation of BufferSource used for encrypted data. |
|
1003 // This buffer source use direct byte buffers that will be |
|
1004 // recycled by the SocketTube subscriber. |
|
1005 // |
|
1006 static final class SSLDirectBufferSource implements BufferSource { |
|
1007 private final Supplier<ByteBuffer> factory; |
|
1008 private final HttpClientImpl client; |
|
1009 private volatile ByteBuffer current; |
|
1010 |
|
1011 public SSLDirectBufferSource(HttpClientImpl client) { |
|
1012 this.client = Objects.requireNonNull(client); |
|
1013 this.factory = Objects.requireNonNull(client.getSSLBufferSupplier()); |
|
1014 } |
|
1015 |
|
1016 // Obtain a 'free' byte buffer from the pool, or return |
|
1017 // the same buffer if nothing was read at the previous cycle. |
|
1018 // The subscriber will be responsible for recycling this |
|
1019 // buffer into the pool (see SSLFlowDelegate.Reader) |
|
1020 @Override |
|
1021 public final ByteBuffer getBuffer() { |
|
1022 assert client.isSelectorThread(); |
|
1023 ByteBuffer buf = current; |
|
1024 if (buf == null) { |
|
1025 buf = current = factory.get(); |
|
1026 } |
|
1027 assert buf.hasRemaining(); |
|
1028 assert buf.position() == 0; |
|
1029 return buf; |
|
1030 } |
|
1031 |
|
1032 // Adds the buffer to the list. The buffer will be later returned to the |
|
1033 // pool by the subscriber (see SSLFlowDelegate.Reader). |
|
1034 // The next buffer returned by getBuffer() will be obtained from the |
|
1035 // pool. It might be the same buffer or another one. |
|
1036 // Because socket tube can read up to MAX_BUFFERS = 3 buffers, and because |
|
1037 // recycling will happen in the flow before onNext returns, then the |
|
1038 // pool can not grow larger than MAX_BUFFERS = 3 buffers, even though |
|
1039 // it's shared by all SSL connections opened on that client. |
|
1040 @Override |
|
1041 public final List<ByteBuffer> append(List <ByteBuffer> list, ByteBuffer buf, int start) { |
|
1042 assert client.isSelectorThread(); |
|
1043 assert buf.isDirect(); |
|
1044 assert start == 0; |
|
1045 assert current == buf; |
|
1046 current = null; |
|
1047 buf.limit(buf.position()); |
|
1048 buf.position(start); |
|
1049 // add the buffer to the list |
|
1050 return SocketTube.listOf(list, buf); |
|
1051 } |
|
1052 } |
|
1053 |
|
1054 // ===================================================================== // |
909 // Socket Channel Read/Write // |
1055 // Socket Channel Read/Write // |
910 // ===================================================================== // |
1056 // ===================================================================== // |
911 static final int MAX_BUFFERS = 3; |
1057 static final int MAX_BUFFERS = 3; |
912 static final List<ByteBuffer> EOF = List.of(); |
1058 static final List<ByteBuffer> EOF = List.of(); |
913 static final List<ByteBuffer> NOTHING = List.of(Utils.EMPTY_BYTEBUFFER); |
1059 static final List<ByteBuffer> NOTHING = List.of(Utils.EMPTY_BYTEBUFFER); |
916 // the ByteBuffer is full, or 0 or -1 (EOF) is returned by read(). |
1062 // the ByteBuffer is full, or 0 or -1 (EOF) is returned by read(). |
917 // When that happens, a slice of the data that has been read so far |
1063 // When that happens, a slice of the data that has been read so far |
918 // is inserted into the returned buffer list, and if the current buffer |
1064 // is inserted into the returned buffer list, and if the current buffer |
919 // has remaining space, that space will be used to read more data when |
1065 // has remaining space, that space will be used to read more data when |
920 // the channel becomes readable again. |
1066 // the channel becomes readable again. |
921 private volatile ByteBuffer current; |
1067 private List<ByteBuffer> readAvailable(BufferSource buffersSource) throws IOException { |
922 private List<ByteBuffer> readAvailable() throws IOException { |
1068 ByteBuffer buf = buffersSource.getBuffer(); |
923 ByteBuffer buf = current; |
|
924 buf = (buf == null || !buf.hasRemaining()) |
|
925 ? (current = buffersSource.get()) : buf; |
|
926 assert buf.hasRemaining(); |
1069 assert buf.hasRemaining(); |
927 |
1070 |
928 int read; |
1071 int read; |
929 int pos = buf.position(); |
1072 int pos = buf.position(); |
930 List<ByteBuffer> list = null; |
1073 List<ByteBuffer> list = null; |
959 } |
1102 } |
960 |
1103 |
961 // check whether this buffer has still some free space available. |
1104 // check whether this buffer has still some free space available. |
962 // if so, we will keep it for the next round. |
1105 // if so, we will keep it for the next round. |
963 final boolean hasRemaining = buf.hasRemaining(); |
1106 final boolean hasRemaining = buf.hasRemaining(); |
964 |
1107 list = buffersSource.append(list, buf, pos); |
965 // creates a slice to add to the list |
1108 |
966 int limit = buf.limit(); |
|
967 buf.limit(buf.position()); |
|
968 buf.position(pos); |
|
969 ByteBuffer slice = buf.slice(); |
|
970 |
|
971 // restore buffer state to what it was before creating the slice |
|
972 buf.position(buf.limit()); |
|
973 buf.limit(limit); |
|
974 |
|
975 // add the buffer to the list |
|
976 list = addToList(list, slice.asReadOnlyBuffer()); |
|
977 if (read <= 0 || list.size() == MAX_BUFFERS) { |
1109 if (read <= 0 || list.size() == MAX_BUFFERS) { |
978 break; |
1110 break; |
979 } |
1111 } |
980 |
1112 |
981 buf = hasRemaining ? buf : (current = buffersSource.get()); |
1113 buf = buffersSource.getBuffer(); |
982 pos = buf.position(); |
1114 pos = buf.position(); |
983 assert buf.hasRemaining(); |
1115 assert buf.hasRemaining(); |
984 } |
1116 } |
985 return list; |
1117 return list; |
986 } |
1118 } |
987 |
1119 |
988 private <T> List<T> addToList(List<T> list, T item) { |
1120 private static <T> List<T> listOf(List<T> list, T item) { |
989 int size = list == null ? 0 : list.size(); |
1121 int size = list == null ? 0 : list.size(); |
990 switch (size) { |
1122 switch (size) { |
991 case 0: return List.of(item); |
1123 case 0: return List.of(item); |
992 case 1: return List.of(list.get(0), item); |
1124 case 1: return List.of(list.get(0), item); |
993 case 2: return List.of(list.get(0), list.get(1), item); |
1125 case 2: return List.of(list.get(0), list.get(1), item); |