src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java
branchhttp-client-branch
changeset 56474 fe2bf7b369b8
parent 56463 b583caf69b39
child 56481 247ed0848e48
equal deleted inserted replaced
56463:b583caf69b39 56474:fe2bf7b369b8
    24  */
    24  */
    25 
    25 
    26 package jdk.internal.net.http;
    26 package jdk.internal.net.http;
    27 
    27 
    28 import java.io.IOException;
    28 import java.io.IOException;
    29 import java.lang.System.Logger.Level;
       
    30 import java.nio.ByteBuffer;
    29 import java.nio.ByteBuffer;
    31 import java.util.List;
    30 import java.util.List;
    32 import java.util.Objects;
    31 import java.util.Objects;
    33 import java.util.concurrent.Flow;
    32 import java.util.concurrent.Flow;
    34 import java.util.concurrent.atomic.AtomicLong;
    33 import java.util.concurrent.atomic.AtomicLong;
    37 import java.nio.channels.SelectionKey;
    36 import java.nio.channels.SelectionKey;
    38 import java.nio.channels.SocketChannel;
    37 import java.nio.channels.SocketChannel;
    39 import java.util.ArrayList;
    38 import java.util.ArrayList;
    40 import java.util.function.Consumer;
    39 import java.util.function.Consumer;
    41 import java.util.function.Supplier;
    40 import java.util.function.Supplier;
       
    41 import jdk.internal.net.http.common.BufferSupplier;
    42 import jdk.internal.net.http.common.Demand;
    42 import jdk.internal.net.http.common.Demand;
    43 import jdk.internal.net.http.common.FlowTube;
    43 import jdk.internal.net.http.common.FlowTube;
    44 import jdk.internal.net.http.common.Logger;
    44 import jdk.internal.net.http.common.Logger;
    45 import jdk.internal.net.http.common.SequentialScheduler;
    45 import jdk.internal.net.http.common.SequentialScheduler;
    46 import jdk.internal.net.http.common.SequentialScheduler.DeferredCompleter;
    46 import jdk.internal.net.http.common.SequentialScheduler.DeferredCompleter;
    57     final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
    57     final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
    58     static final AtomicLong IDS = new AtomicLong();
    58     static final AtomicLong IDS = new AtomicLong();
    59 
    59 
    60     private final HttpClientImpl client;
    60     private final HttpClientImpl client;
    61     private final SocketChannel channel;
    61     private final SocketChannel channel;
    62     private final Supplier<ByteBuffer> buffersSource;
    62     private final SliceBufferSource sliceBuffersSource;
    63     private final Object lock = new Object();
    63     private final Object lock = new Object();
    64     private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
    64     private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
    65     private final InternalReadPublisher readPublisher;
    65     private final InternalReadPublisher readPublisher;
    66     private final InternalWriteSubscriber writeSubscriber;
    66     private final InternalWriteSubscriber writeSubscriber;
    67     private final long id = IDS.incrementAndGet();
    67     private final long id = IDS.incrementAndGet();
    68 
    68 
    69     public SocketTube(HttpClientImpl client, SocketChannel channel,
    69     public SocketTube(HttpClientImpl client, SocketChannel channel,
    70                       Supplier<ByteBuffer> buffersSource) {
    70                       Supplier<ByteBuffer> buffersFactory) {
    71         this.client = client;
    71         this.client = client;
    72         this.channel = channel;
    72         this.channel = channel;
    73         this.buffersSource = buffersSource;
    73         this.sliceBuffersSource = new SliceBufferSource(buffersFactory);
       
    74 
    74         this.readPublisher = new InternalReadPublisher();
    75         this.readPublisher = new InternalReadPublisher();
    75         this.writeSubscriber = new InternalWriteSubscriber();
    76         this.writeSubscriber = new InternalWriteSubscriber();
    76     }
    77     }
    77 
    78 
    78     /**
    79     /**
   562 
   563 
   563         final class ReadSubscription implements Flow.Subscription {
   564         final class ReadSubscription implements Flow.Subscription {
   564             final InternalReadSubscription impl;
   565             final InternalReadSubscription impl;
   565             final TubeSubscriber  subscriber;
   566             final TubeSubscriber  subscriber;
   566             final AtomicReference<Throwable> errorRef = new AtomicReference<>();
   567             final AtomicReference<Throwable> errorRef = new AtomicReference<>();
       
   568             final BufferSource bufferSource;
   567             volatile boolean subscribed;
   569             volatile boolean subscribed;
   568             volatile boolean cancelled;
   570             volatile boolean cancelled;
   569             volatile boolean completed;
   571             volatile boolean completed;
   570 
   572 
   571             public ReadSubscription(InternalReadSubscription impl,
   573             public ReadSubscription(InternalReadSubscription impl,
   572                                     TubeSubscriber subscriber) {
   574                                     TubeSubscriber subscriber) {
   573                 this.impl = impl;
   575                 this.impl = impl;
       
   576                 this.bufferSource = subscriber.supportsRecycling()
       
   577                         ? new SSLDirectBufferSource(client)
       
   578                         : SocketTube.this.sliceBuffersSource;
   574                 this.subscriber = subscriber;
   579                 this.subscriber = subscriber;
   575             }
   580             }
   576 
   581 
   577             @Override
   582             @Override
   578             public void cancel() {
   583             public void cancel() {
   777                         // If we reach here then we must be in the selector thread.
   782                         // If we reach here then we must be in the selector thread.
   778                         assert client.isSelectorThread();
   783                         assert client.isSelectorThread();
   779                         if (demand.tryDecrement()) {
   784                         if (demand.tryDecrement()) {
   780                             // we have demand.
   785                             // we have demand.
   781                             try {
   786                             try {
   782                                 List<ByteBuffer> bytes = readAvailable();
   787                                 List<ByteBuffer> bytes = readAvailable(subscription.bufferSource);
   783                                 if (bytes == EOF) {
   788                                 if (bytes == EOF) {
   784                                     if (!completed) {
   789                                     if (!completed) {
   785                                         if (debug.on()) debug.log("got read EOF");
   790                                         if (debug.on()) debug.log("got read EOF");
   786                                         completed = true;
   791                                         completed = true;
   787                                         // safe to pause here because we're finished
   792                                         // safe to pause here because we're finished
   904             Logger debug() { return debug; }
   909             Logger debug() { return debug; }
   905         }
   910         }
   906     }
   911     }
   907 
   912 
   908     // ===================================================================== //
   913     // ===================================================================== //
       
   914     //                       Buffer Management                               //
       
   915     // ===================================================================== //
       
   916 
       
   917     // This interface is used by readAvailable(BufferSource);
       
   918     public interface BufferSource {
       
   919         /**
       
   920          * Returns a buffer to read data from the socket.
       
   921          * Different implementation can have different strategies, as to
       
   922          * which kind of buffer to return, or whether to return the same
       
   923          * buffer. The only constraints are that
       
   924          *   a. the buffer returned must not be null
       
   925          *   b. the buffer position indicates where to start reading
       
   926          *   c. the buffer limit indicates where to stop reading.
       
   927          *   d. the buffer is 'free' - that is - it is not used
       
   928          *      or retained by anybody else
       
   929          * @return A buffer to read data from the socket.
       
   930          */
       
   931         ByteBuffer getBuffer();
       
   932 
       
   933         /**
       
   934          * Append the data read into the buffer to the list of buffer to
       
   935          * be sent downstream to the subscriber. May return a new
       
   936          * list, or append to the given list.
       
   937          *
       
   938          * Different implementation can have different strategies, but
       
   939          * must obviously be consistent with the implementation of the
       
   940          * getBuffer() method. For instance, an implementation could
       
   941          * decide to add the buffer to the list and return a new buffer
       
   942          * next time getBuffer() is called, or could decide to add a buffer
       
   943          * slice to the list and return the same buffer (if remaining
       
   944          * space is available) next time getBuffer() is called.
       
   945          *
       
   946          * @param list    The list before adding the data. Can be null.
       
   947          * @param buffer  The buffer containing the data to add to the list.
       
   948          * @param start   The start position at which data were read.
       
   949          *                The current buffer position indicates the end.
       
   950          * @return A possibly new list where a buffer containing the
       
   951          *         data read from the socket has been added.
       
   952          */
       
   953         List<ByteBuffer> append(List <ByteBuffer> list, ByteBuffer buffer, int start);
       
   954     }
       
   955 
       
   956     // An implementation of BufferSource used for unencrypted data.
       
   957     // This buffer source uses heap buffers and avoids wasting memory
       
   958     // by forwarding read only buffer slices downstream.
       
   959     // Buffers allocated through this source are simply GC'ed when
       
   960     // they are no longer referenced.
       
   961     static final class SliceBufferSource implements BufferSource {
       
   962         private final Supplier<ByteBuffer> factory;
       
   963         private volatile ByteBuffer current;
       
   964         public SliceBufferSource() {
       
   965             this(Utils::getBuffer);
       
   966         }
       
   967         public SliceBufferSource(Supplier<ByteBuffer> factory) {
       
   968             this.factory = Objects.requireNonNull(factory);
       
   969         }
       
   970 
       
   971         // reuse the same buffer if some space remains available.
       
   972         // otherwise, returns a new heap buffer.
       
   973         @Override
       
   974         public final ByteBuffer getBuffer() {
       
   975             ByteBuffer buf = current;
       
   976             buf = (buf == null || !buf.hasRemaining())
       
   977                     ? (current = factory.get()) : buf;
       
   978             assert buf.hasRemaining();
       
   979             return buf;
       
   980         }
       
   981 
       
   982         // Adds a read only slice to the list, potentially returning a
       
   983         // new list with with that slice at the end.
       
   984         @Override
       
   985         public final List<ByteBuffer> append(List <ByteBuffer> list, ByteBuffer buf, int start) {
       
   986             // creates a slice to add to the list
       
   987             int limit = buf.limit();
       
   988             buf.limit(buf.position());
       
   989             buf.position(start);
       
   990             ByteBuffer slice = buf.slice();
       
   991 
       
   992             // restore buffer state to what it was before creating the slice
       
   993             buf.position(buf.limit());
       
   994             buf.limit(limit);
       
   995 
       
   996             // add the buffer to the list
       
   997             return SocketTube.listOf(list, slice.asReadOnlyBuffer());
       
   998         }
       
   999     }
       
  1000 
       
  1001 
       
  1002     // An implementation of BufferSource used for encrypted data.
       
  1003     // This buffer source use direct byte buffers that will be
       
  1004     // recycled by the SocketTube subscriber.
       
  1005     //
       
  1006     static final class SSLDirectBufferSource implements BufferSource {
       
  1007         private final Supplier<ByteBuffer> factory;
       
  1008         private final HttpClientImpl client;
       
  1009         private volatile ByteBuffer current;
       
  1010 
       
  1011         public SSLDirectBufferSource(HttpClientImpl client) {
       
  1012             this.client = Objects.requireNonNull(client);
       
  1013             this.factory = Objects.requireNonNull(client.getSSLBufferSupplier());
       
  1014         }
       
  1015 
       
  1016         // Obtain a 'free' byte buffer from the pool, or return
       
  1017         // the same buffer if nothing was read at the previous cycle.
       
  1018         // The subscriber will be responsible for recycling this
       
  1019         // buffer into the pool (see SSLFlowDelegate.Reader)
       
  1020         @Override
       
  1021         public final ByteBuffer getBuffer() {
       
  1022             assert client.isSelectorThread();
       
  1023             ByteBuffer buf = current;
       
  1024             if (buf == null) {
       
  1025                 buf = current = factory.get();
       
  1026             }
       
  1027             assert buf.hasRemaining();
       
  1028             assert buf.position() == 0;
       
  1029             return buf;
       
  1030         }
       
  1031 
       
  1032         // Adds the buffer to the list. The buffer will be later returned to the
       
  1033         // pool by the subscriber (see SSLFlowDelegate.Reader).
       
  1034         // The next buffer returned by getBuffer() will be obtained from the
       
  1035         // pool. It might be the same buffer or another one.
       
  1036         // Because socket tube can read up to MAX_BUFFERS = 3 buffers, and because
       
  1037         // recycling will happen in the flow before onNext returns, then the
       
  1038         // pool can not grow larger than MAX_BUFFERS = 3 buffers, even though
       
  1039         // it's shared by all SSL connections opened on that client.
       
  1040         @Override
       
  1041         public final List<ByteBuffer> append(List <ByteBuffer> list, ByteBuffer buf, int start) {
       
  1042             assert client.isSelectorThread();
       
  1043             assert buf.isDirect();
       
  1044             assert start == 0;
       
  1045             assert current == buf;
       
  1046             current = null;
       
  1047             buf.limit(buf.position());
       
  1048             buf.position(start);
       
  1049             // add the buffer to the list
       
  1050             return SocketTube.listOf(list, buf);
       
  1051         }
       
  1052     }
       
  1053 
       
  1054     // ===================================================================== //
   909     //                   Socket Channel Read/Write                           //
  1055     //                   Socket Channel Read/Write                           //
   910     // ===================================================================== //
  1056     // ===================================================================== //
   911     static final int MAX_BUFFERS = 3;
  1057     static final int MAX_BUFFERS = 3;
   912     static final List<ByteBuffer> EOF = List.of();
  1058     static final List<ByteBuffer> EOF = List.of();
   913     static final List<ByteBuffer> NOTHING = List.of(Utils.EMPTY_BYTEBUFFER);
  1059     static final List<ByteBuffer> NOTHING = List.of(Utils.EMPTY_BYTEBUFFER);
   916     // the ByteBuffer is full, or 0 or -1 (EOF) is returned by read().
  1062     // the ByteBuffer is full, or 0 or -1 (EOF) is returned by read().
   917     // When that happens, a slice of the data that has been read so far
  1063     // When that happens, a slice of the data that has been read so far
   918     // is inserted into the returned buffer list, and if the current buffer
  1064     // is inserted into the returned buffer list, and if the current buffer
   919     // has remaining space, that space will be used to read more data when
  1065     // has remaining space, that space will be used to read more data when
   920     // the channel becomes readable again.
  1066     // the channel becomes readable again.
   921     private volatile ByteBuffer current;
  1067     private List<ByteBuffer> readAvailable(BufferSource buffersSource) throws IOException {
   922     private List<ByteBuffer> readAvailable() throws IOException {
  1068         ByteBuffer buf = buffersSource.getBuffer();
   923         ByteBuffer buf = current;
       
   924         buf = (buf == null || !buf.hasRemaining())
       
   925                 ? (current = buffersSource.get()) : buf;
       
   926         assert buf.hasRemaining();
  1069         assert buf.hasRemaining();
   927 
  1070 
   928         int read;
  1071         int read;
   929         int pos = buf.position();
  1072         int pos = buf.position();
   930         List<ByteBuffer> list = null;
  1073         List<ByteBuffer> list = null;
   959             }
  1102             }
   960 
  1103 
   961             // check whether this buffer has still some free space available.
  1104             // check whether this buffer has still some free space available.
   962             // if so, we will keep it for the next round.
  1105             // if so, we will keep it for the next round.
   963             final boolean hasRemaining = buf.hasRemaining();
  1106             final boolean hasRemaining = buf.hasRemaining();
   964 
  1107             list = buffersSource.append(list, buf, pos);
   965             // creates a slice to add to the list
  1108 
   966             int limit = buf.limit();
       
   967             buf.limit(buf.position());
       
   968             buf.position(pos);
       
   969             ByteBuffer slice = buf.slice();
       
   970 
       
   971             // restore buffer state to what it was before creating the slice
       
   972             buf.position(buf.limit());
       
   973             buf.limit(limit);
       
   974 
       
   975             // add the buffer to the list
       
   976             list = addToList(list, slice.asReadOnlyBuffer());
       
   977             if (read <= 0 || list.size() == MAX_BUFFERS) {
  1109             if (read <= 0 || list.size() == MAX_BUFFERS) {
   978                 break;
  1110                 break;
   979             }
  1111             }
   980 
  1112 
   981             buf = hasRemaining ? buf : (current = buffersSource.get());
  1113             buf = buffersSource.getBuffer();
   982             pos = buf.position();
  1114             pos = buf.position();
   983             assert buf.hasRemaining();
  1115             assert buf.hasRemaining();
   984         }
  1116         }
   985         return list;
  1117         return list;
   986     }
  1118     }
   987 
  1119 
   988     private <T> List<T> addToList(List<T> list, T item) {
  1120     private static <T> List<T> listOf(List<T> list, T item) {
   989         int size = list == null ? 0 : list.size();
  1121         int size = list == null ? 0 : list.size();
   990         switch (size) {
  1122         switch (size) {
   991             case 0: return List.of(item);
  1123             case 0: return List.of(item);
   992             case 1: return List.of(list.get(0), item);
  1124             case 1: return List.of(list.get(0), item);
   993             case 2: return List.of(list.get(0), list.get(1), item);
  1125             case 2: return List.of(list.get(0), list.get(1), item);