src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java
changeset 49944 4690a2871b44
parent 49765 ee6f7a61f3a5
child 50681 4254bed3c09d
child 56507 2294c51eae30
equal deleted inserted replaced
49943:8e1ed2a15845 49944:4690a2871b44
    24  */
    24  */
    25 
    25 
    26 package jdk.internal.net.http;
    26 package jdk.internal.net.http;
    27 
    27 
    28 import java.io.IOException;
    28 import java.io.IOException;
    29 import java.lang.System.Logger.Level;
       
    30 import java.nio.ByteBuffer;
    29 import java.nio.ByteBuffer;
    31 import java.util.List;
    30 import java.util.List;
    32 import java.util.Objects;
    31 import java.util.Objects;
    33 import java.util.concurrent.Flow;
    32 import java.util.concurrent.Flow;
    34 import java.util.concurrent.atomic.AtomicLong;
    33 import java.util.concurrent.atomic.AtomicLong;
    37 import java.nio.channels.SelectionKey;
    36 import java.nio.channels.SelectionKey;
    38 import java.nio.channels.SocketChannel;
    37 import java.nio.channels.SocketChannel;
    39 import java.util.ArrayList;
    38 import java.util.ArrayList;
    40 import java.util.function.Consumer;
    39 import java.util.function.Consumer;
    41 import java.util.function.Supplier;
    40 import java.util.function.Supplier;
       
    41 import jdk.internal.net.http.common.BufferSupplier;
    42 import jdk.internal.net.http.common.Demand;
    42 import jdk.internal.net.http.common.Demand;
    43 import jdk.internal.net.http.common.FlowTube;
    43 import jdk.internal.net.http.common.FlowTube;
    44 import jdk.internal.net.http.common.Logger;
    44 import jdk.internal.net.http.common.Logger;
    45 import jdk.internal.net.http.common.SequentialScheduler;
    45 import jdk.internal.net.http.common.SequentialScheduler;
    46 import jdk.internal.net.http.common.SequentialScheduler.DeferredCompleter;
    46 import jdk.internal.net.http.common.SequentialScheduler.DeferredCompleter;
    57     final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
    57     final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
    58     static final AtomicLong IDS = new AtomicLong();
    58     static final AtomicLong IDS = new AtomicLong();
    59 
    59 
    60     private final HttpClientImpl client;
    60     private final HttpClientImpl client;
    61     private final SocketChannel channel;
    61     private final SocketChannel channel;
    62     private final Supplier<ByteBuffer> buffersSource;
    62     private final SliceBufferSource sliceBuffersSource;
    63     private final Object lock = new Object();
    63     private final Object lock = new Object();
    64     private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
    64     private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
    65     private final InternalReadPublisher readPublisher;
    65     private final InternalReadPublisher readPublisher;
    66     private final InternalWriteSubscriber writeSubscriber;
    66     private final InternalWriteSubscriber writeSubscriber;
    67     private final long id = IDS.incrementAndGet();
    67     private final long id = IDS.incrementAndGet();
    68 
    68 
    69     public SocketTube(HttpClientImpl client, SocketChannel channel,
    69     public SocketTube(HttpClientImpl client, SocketChannel channel,
    70                       Supplier<ByteBuffer> buffersSource) {
    70                       Supplier<ByteBuffer> buffersFactory) {
    71         this.client = client;
    71         this.client = client;
    72         this.channel = channel;
    72         this.channel = channel;
    73         this.buffersSource = buffersSource;
    73         this.sliceBuffersSource = new SliceBufferSource(buffersFactory);
       
    74 
    74         this.readPublisher = new InternalReadPublisher();
    75         this.readPublisher = new InternalReadPublisher();
    75         this.writeSubscriber = new InternalWriteSubscriber();
    76         this.writeSubscriber = new InternalWriteSubscriber();
    76     }
    77     }
    77 
    78 
    78     /**
    79     /**
   562 
   563 
   563         final class ReadSubscription implements Flow.Subscription {
   564         final class ReadSubscription implements Flow.Subscription {
   564             final InternalReadSubscription impl;
   565             final InternalReadSubscription impl;
   565             final TubeSubscriber  subscriber;
   566             final TubeSubscriber  subscriber;
   566             final AtomicReference<Throwable> errorRef = new AtomicReference<>();
   567             final AtomicReference<Throwable> errorRef = new AtomicReference<>();
       
   568             final BufferSource bufferSource;
   567             volatile boolean subscribed;
   569             volatile boolean subscribed;
   568             volatile boolean cancelled;
   570             volatile boolean cancelled;
   569             volatile boolean completed;
   571             volatile boolean completed;
   570 
   572 
   571             public ReadSubscription(InternalReadSubscription impl,
   573             public ReadSubscription(InternalReadSubscription impl,
   572                                     TubeSubscriber subscriber) {
   574                                     TubeSubscriber subscriber) {
   573                 this.impl = impl;
   575                 this.impl = impl;
       
   576                 this.bufferSource = subscriber.supportsRecycling()
       
   577                         ? new SSLDirectBufferSource(client)
       
   578                         : SocketTube.this.sliceBuffersSource;
   574                 this.subscriber = subscriber;
   579                 this.subscriber = subscriber;
   575             }
   580             }
   576 
   581 
   577             @Override
   582             @Override
   578             public void cancel() {
   583             public void cancel() {
   777                         // If we reach here then we must be in the selector thread.
   782                         // If we reach here then we must be in the selector thread.
   778                         assert client.isSelectorThread();
   783                         assert client.isSelectorThread();
   779                         if (demand.tryDecrement()) {
   784                         if (demand.tryDecrement()) {
   780                             // we have demand.
   785                             // we have demand.
   781                             try {
   786                             try {
   782                                 List<ByteBuffer> bytes = readAvailable();
   787                                 List<ByteBuffer> bytes = readAvailable(current.bufferSource);
   783                                 if (bytes == EOF) {
   788                                 if (bytes == EOF) {
   784                                     if (!completed) {
   789                                     if (!completed) {
   785                                         if (debug.on()) debug.log("got read EOF");
   790                                         if (debug.on()) debug.log("got read EOF");
   786                                         completed = true;
   791                                         completed = true;
   787                                         // safe to pause here because we're finished
   792                                         // safe to pause here because we're finished
   904             Logger debug() { return debug; }
   909             Logger debug() { return debug; }
   905         }
   910         }
   906     }
   911     }
   907 
   912 
   908     // ===================================================================== //
   913     // ===================================================================== //
       
   914     //                       Buffer Management                               //
       
   915     // ===================================================================== //
       
   916 
       
   917     // This interface is used by readAvailable(BufferSource);
       
   918     public interface BufferSource {
       
   919         /**
       
   920          * Returns a buffer to read data from the socket.
       
   921          *
       
   922          * @implNote
       
   923          * Different implementation can have different strategies, as to
       
   924          * which kind of buffer to return, or whether to return the same
       
   925          * buffer. The only constraints are that:
       
   926          *   a. the buffer returned must not be null
       
   927          *   b. the buffer position indicates where to start reading
       
   928          *   c. the buffer limit indicates where to stop reading.
       
   929          *   d. the buffer is 'free' - that is - it is not used
       
   930          *      or retained by anybody else
       
   931          *
       
   932          * @return A buffer to read data from the socket.
       
   933          */
       
   934         ByteBuffer getBuffer();
       
   935 
       
   936         /**
       
   937          * Appends the read-data in {@code buffer} to the list of buffer to
       
   938          * be sent downstream to the subscriber. May return a new
       
   939          * list, or append to the given list.
       
   940          *
       
   941          * @implNote
       
   942          * Different implementation can have different strategies, but
       
   943          * must obviously be consistent with the implementation of the
       
   944          * getBuffer() method. For instance, an implementation could
       
   945          * decide to add the buffer to the list and return a new buffer
       
   946          * next time getBuffer() is called, or could decide to add a buffer
       
   947          * slice to the list and return the same buffer (if remaining
       
   948          * space is available) next time getBuffer() is called.
       
   949          *
       
   950          * @param list    The list before adding the data. Can be null.
       
   951          * @param buffer  The buffer containing the data to add to the list.
       
   952          * @param start   The start position at which data were read.
       
   953          *                The current buffer position indicates the end.
       
   954          * @return A possibly new list where a buffer containing the
       
   955          *         data read from the socket has been added.
       
   956          */
       
   957         List<ByteBuffer> append(List<ByteBuffer> list, ByteBuffer buffer, int start);
       
   958 
       
   959         /**
       
   960          * Returns the given unused {@code buffer}, previously obtained from
       
   961          * {@code getBuffer}.
       
   962          *
       
   963          * @implNote This method can be used, if necessary, to return
       
   964          *  the unused buffer to the pull.
       
   965          *
       
   966          * @param buffer The unused buffer.
       
   967          */
       
   968         default void returnUnused(ByteBuffer buffer) { }
       
   969     }
       
   970 
       
   971     // An implementation of BufferSource used for unencrypted data.
       
   972     // This buffer source uses heap buffers and avoids wasting memory
       
   973     // by forwarding read-only buffer slices downstream.
       
   974     // Buffers allocated through this source are simply GC'ed when
       
   975     // they are no longer referenced.
       
   976     private static final class SliceBufferSource implements BufferSource {
       
   977         private final Supplier<ByteBuffer> factory;
       
   978         private volatile ByteBuffer current;
       
   979 
       
   980         public SliceBufferSource() {
       
   981             this(Utils::getBuffer);
       
   982         }
       
   983         public SliceBufferSource(Supplier<ByteBuffer> factory) {
       
   984             this.factory = Objects.requireNonNull(factory);
       
   985         }
       
   986 
       
   987         // Reuses the same buffer if some space remains available.
       
   988         // Otherwise, returns a new heap buffer.
       
   989         @Override
       
   990         public final ByteBuffer getBuffer() {
       
   991             ByteBuffer buf = current;
       
   992             buf = (buf == null || !buf.hasRemaining())
       
   993                     ? (current = factory.get()) : buf;
       
   994             assert buf.hasRemaining();
       
   995             return buf;
       
   996         }
       
   997 
       
   998         // Adds a read-only slice to the list, potentially returning a
       
   999         // new list with that slice at the end.
       
  1000         @Override
       
  1001         public final List<ByteBuffer> append(List <ByteBuffer> list, ByteBuffer buf, int start) {
       
  1002             // creates a slice to add to the list
       
  1003             int limit = buf.limit();
       
  1004             buf.limit(buf.position());
       
  1005             buf.position(start);
       
  1006             ByteBuffer slice = buf.slice();
       
  1007 
       
  1008             // restore buffer state to what it was before creating the slice
       
  1009             buf.position(buf.limit());
       
  1010             buf.limit(limit);
       
  1011 
       
  1012             // add the buffer to the list
       
  1013             return SocketTube.listOf(list, slice.asReadOnlyBuffer());
       
  1014         }
       
  1015     }
       
  1016 
       
  1017 
       
  1018     // An implementation of BufferSource used for encrypted data.
       
  1019     // This buffer source uses direct byte buffers that will be
       
  1020     // recycled by the SocketTube subscriber.
       
  1021     //
       
  1022     private static final class SSLDirectBufferSource implements BufferSource {
       
  1023         private final BufferSupplier factory;
       
  1024         private final HttpClientImpl client;
       
  1025         private ByteBuffer current;
       
  1026 
       
  1027         public SSLDirectBufferSource(HttpClientImpl client) {
       
  1028             this.client = Objects.requireNonNull(client);
       
  1029             this.factory = Objects.requireNonNull(client.getSSLBufferSupplier());
       
  1030         }
       
  1031 
       
  1032         // Obtains a 'free' byte buffer from the pool, or returns
       
  1033         // the same buffer if nothing was read at the previous cycle.
       
  1034         // The subscriber will be responsible for recycling this
       
  1035         // buffer into the pool (see SSLFlowDelegate.Reader)
       
  1036         @Override
       
  1037         public final ByteBuffer getBuffer() {
       
  1038             assert client.isSelectorThread();
       
  1039             ByteBuffer buf = current;
       
  1040             if (buf == null) {
       
  1041                 buf = current = factory.get();
       
  1042             }
       
  1043             assert buf.hasRemaining();
       
  1044             assert buf.position() == 0;
       
  1045             return buf;
       
  1046         }
       
  1047 
       
  1048         // Adds the buffer to the list. The buffer will be later returned to the
       
  1049         // pool by the subscriber (see SSLFlowDelegate.Reader).
       
  1050         // The next buffer returned by getBuffer() will be obtained from the
       
  1051         // pool. It might be the same buffer or another one.
       
  1052         // Because socket tube can read up to MAX_BUFFERS = 3 buffers, and because
       
  1053         // recycling will happen in the flow before onNext returns, then the
       
  1054         // pool can not grow larger than MAX_BUFFERS = 3 buffers, even though
       
  1055         // it's shared by all SSL connections opened on that client.
       
  1056         @Override
       
  1057         public final List<ByteBuffer> append(List <ByteBuffer> list, ByteBuffer buf, int start) {
       
  1058             assert client.isSelectorThread();
       
  1059             assert buf.isDirect();
       
  1060             assert start == 0;
       
  1061             assert current == buf;
       
  1062             current = null;
       
  1063             buf.limit(buf.position());
       
  1064             buf.position(start);
       
  1065             // add the buffer to the list
       
  1066             return SocketTube.listOf(list, buf);
       
  1067         }
       
  1068 
       
  1069         @Override
       
  1070         public void returnUnused(ByteBuffer buffer) {
       
  1071             // if current is null, then the buffer will have been added to the
       
  1072             // list, through append. Otherwise, current is not null, and needs
       
  1073             // to be returned to prevent the buffer supplier pool from growing
       
  1074             // to more than MAX_BUFFERS.
       
  1075             assert buffer == current;
       
  1076             ByteBuffer buf = current;
       
  1077             if (buf != null) {
       
  1078                 assert buf.position() == 0;
       
  1079                 current = null;
       
  1080                 // the supplier assert if buf has remaining
       
  1081                 buf.limit(buf.position());
       
  1082                 factory.recycle(buf);
       
  1083             }
       
  1084         }
       
  1085     }
       
  1086 
       
  1087     // ===================================================================== //
   909     //                   Socket Channel Read/Write                           //
  1088     //                   Socket Channel Read/Write                           //
   910     // ===================================================================== //
  1089     // ===================================================================== //
   911     static final int MAX_BUFFERS = 3;
  1090     static final int MAX_BUFFERS = 3;
   912     static final List<ByteBuffer> EOF = List.of();
  1091     static final List<ByteBuffer> EOF = List.of();
   913     static final List<ByteBuffer> NOTHING = List.of(Utils.EMPTY_BYTEBUFFER);
  1092     static final List<ByteBuffer> NOTHING = List.of(Utils.EMPTY_BYTEBUFFER);
   916     // the ByteBuffer is full, or 0 or -1 (EOF) is returned by read().
  1095     // the ByteBuffer is full, or 0 or -1 (EOF) is returned by read().
   917     // When that happens, a slice of the data that has been read so far
  1096     // When that happens, a slice of the data that has been read so far
   918     // is inserted into the returned buffer list, and if the current buffer
  1097     // is inserted into the returned buffer list, and if the current buffer
   919     // has remaining space, that space will be used to read more data when
  1098     // has remaining space, that space will be used to read more data when
   920     // the channel becomes readable again.
  1099     // the channel becomes readable again.
   921     private volatile ByteBuffer current;
  1100     private List<ByteBuffer> readAvailable(BufferSource buffersSource) throws IOException {
   922     private List<ByteBuffer> readAvailable() throws IOException {
  1101         ByteBuffer buf = buffersSource.getBuffer();
   923         ByteBuffer buf = current;
       
   924         buf = (buf == null || !buf.hasRemaining())
       
   925                 ? (current = buffersSource.get()) : buf;
       
   926         assert buf.hasRemaining();
  1102         assert buf.hasRemaining();
   927 
  1103 
   928         int read;
  1104         int read;
   929         int pos = buf.position();
  1105         int pos = buf.position();
   930         List<ByteBuffer> list = null;
  1106         List<ByteBuffer> list = null;
   934                     if (!buf.hasRemaining())
  1110                     if (!buf.hasRemaining())
   935                         break;
  1111                         break;
   936                 }
  1112                 }
   937             } catch (IOException x) {
  1113             } catch (IOException x) {
   938                 if (buf.position() == pos && list == null) {
  1114                 if (buf.position() == pos && list == null) {
       
  1115                     // make sure that the buffer source will recycle
       
  1116                     // 'buf' if needed
       
  1117                     buffersSource.returnUnused(buf);
   939                     // no bytes have been read, just throw...
  1118                     // no bytes have been read, just throw...
   940                     throw x;
  1119                     throw x;
   941                 } else {
  1120                 } else {
   942                     // some bytes have been read, return them and fail next time
  1121                     // some bytes have been read, return them and fail next time
   943                     errorRef.compareAndSet(null, x);
  1122                     errorRef.compareAndSet(null, x);
   949             if (buf.position() == pos) {
  1128             if (buf.position() == pos) {
   950                 // An empty list signals the end of data, and should only be
  1129                 // An empty list signals the end of data, and should only be
   951                 // returned if read == -1. If some data has already been read,
  1130                 // returned if read == -1. If some data has already been read,
   952                 // then it must be returned. -1 will be returned next time
  1131                 // then it must be returned. -1 will be returned next time
   953                 // the caller attempts to read something.
  1132                 // the caller attempts to read something.
       
  1133                 buffersSource.returnUnused(buf);
   954                 if (list == null) {
  1134                 if (list == null) {
   955                     // nothing read - list was null - return EOF or NOTHING
  1135                     // nothing read - list was null - return EOF or NOTHING
   956                     list = read == -1 ? EOF : NOTHING;
  1136                     list = read == -1 ? EOF : NOTHING;
   957                 }
  1137                 }
   958                 break;
  1138                 break;
   959             }
  1139             }
   960 
  1140 
   961             // check whether this buffer has still some free space available.
  1141             // check whether this buffer has still some free space available.
   962             // if so, we will keep it for the next round.
  1142             // if so, we will keep it for the next round.
   963             final boolean hasRemaining = buf.hasRemaining();
  1143             list = buffersSource.append(list, buf, pos);
   964 
  1144 
   965             // creates a slice to add to the list
       
   966             int limit = buf.limit();
       
   967             buf.limit(buf.position());
       
   968             buf.position(pos);
       
   969             ByteBuffer slice = buf.slice();
       
   970 
       
   971             // restore buffer state to what it was before creating the slice
       
   972             buf.position(buf.limit());
       
   973             buf.limit(limit);
       
   974 
       
   975             // add the buffer to the list
       
   976             list = addToList(list, slice.asReadOnlyBuffer());
       
   977             if (read <= 0 || list.size() == MAX_BUFFERS) {
  1145             if (read <= 0 || list.size() == MAX_BUFFERS) {
   978                 break;
  1146                 break;
   979             }
  1147             }
   980 
  1148 
   981             buf = hasRemaining ? buf : (current = buffersSource.get());
  1149             buf = buffersSource.getBuffer();
   982             pos = buf.position();
  1150             pos = buf.position();
   983             assert buf.hasRemaining();
  1151             assert buf.hasRemaining();
   984         }
  1152         }
   985         return list;
  1153         return list;
   986     }
  1154     }
   987 
  1155 
   988     private <T> List<T> addToList(List<T> list, T item) {
  1156     private static <T> List<T> listOf(List<T> list, T item) {
   989         int size = list == null ? 0 : list.size();
  1157         int size = list == null ? 0 : list.size();
   990         switch (size) {
  1158         switch (size) {
   991             case 0: return List.of(item);
  1159             case 0: return List.of(item);
   992             case 1: return List.of(list.get(0), item);
  1160             case 1: return List.of(list.get(0), item);
   993             case 2: return List.of(list.get(0), list.get(1), item);
  1161             case 2: return List.of(list.get(0), list.get(1), item);
   994             default: // slow path if MAX_BUFFERS > 3
  1162             default: // slow path if MAX_BUFFERS > 3
   995                 ArrayList<T> res = new ArrayList<>(list);
  1163                 List<T> res = list instanceof ArrayList ? list : new ArrayList<>(list);
   996                 res.add(item);
  1164                 res.add(item);
   997                 return res;
  1165                 return res;
   998         }
  1166         }
   999     }
  1167     }
  1000 
  1168