src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java
branchhttp-client-branch
changeset 56474 fe2bf7b369b8
parent 56463 b583caf69b39
child 56481 247ed0848e48
equal deleted inserted replaced
56463:b583caf69b39 56474:fe2bf7b369b8
    43 import java.util.concurrent.ConcurrentLinkedQueue;
    43 import java.util.concurrent.ConcurrentLinkedQueue;
    44 import java.util.concurrent.Executor;
    44 import java.util.concurrent.Executor;
    45 import java.util.concurrent.Flow;
    45 import java.util.concurrent.Flow;
    46 import java.util.concurrent.Flow.Subscriber;
    46 import java.util.concurrent.Flow.Subscriber;
    47 import java.util.concurrent.atomic.AtomicInteger;
    47 import java.util.concurrent.atomic.AtomicInteger;
       
    48 import java.util.function.Consumer;
    48 
    49 
    49 /**
    50 /**
    50  * Implements SSL using two SubscriberWrappers.
    51  * Implements SSL using two SubscriberWrappers.
    51  *
    52  *
    52  * <p> Constructor takes two Flow.Subscribers: one that receives the network
    53  * <p> Constructor takes two Flow.Subscribers: one that receives the network
    94     final CompletableFuture<String> alpnCF; // completes on initial handshake
    95     final CompletableFuture<String> alpnCF; // completes on initial handshake
    95     final static ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER;
    96     final static ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER;
    96     volatile boolean close_notify_received;
    97     volatile boolean close_notify_received;
    97     final CompletableFuture<Void> readerCF;
    98     final CompletableFuture<Void> readerCF;
    98     final CompletableFuture<Void> writerCF;
    99     final CompletableFuture<Void> writerCF;
       
   100     final Consumer<ByteBuffer> recycler;
    99     static AtomicInteger scount = new AtomicInteger(1);
   101     static AtomicInteger scount = new AtomicInteger(1);
   100     final int id;
   102     final int id;
   101 
   103 
   102     /**
   104     /**
   103      * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each
   105      * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each
   107     public SSLFlowDelegate(SSLEngine engine,
   109     public SSLFlowDelegate(SSLEngine engine,
   108                            Executor exec,
   110                            Executor exec,
   109                            Subscriber<? super List<ByteBuffer>> downReader,
   111                            Subscriber<? super List<ByteBuffer>> downReader,
   110                            Subscriber<? super List<ByteBuffer>> downWriter)
   112                            Subscriber<? super List<ByteBuffer>> downWriter)
   111     {
   113     {
       
   114         this(engine, exec, null, downReader, downWriter);
       
   115     }
       
   116 
       
   117     /**
       
   118      * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each
       
   119      * Flow.Subscriber requires an associated {@link CompletableFuture}
       
   120      * for errors that need to be signaled from downstream to upstream.
       
   121      */
       
   122     public SSLFlowDelegate(SSLEngine engine,
       
   123             Executor exec,
       
   124             Consumer<ByteBuffer> recycler,
       
   125             Subscriber<? super List<ByteBuffer>> downReader,
       
   126             Subscriber<? super List<ByteBuffer>> downWriter)
       
   127         {
   112         this.id = scount.getAndIncrement();
   128         this.id = scount.getAndIncrement();
   113         this.tubeName = String.valueOf(downWriter);
   129         this.tubeName = String.valueOf(downWriter);
       
   130         this.recycler = recycler;
   114         this.reader = new Reader();
   131         this.reader = new Reader();
   115         this.writer = new Writer();
   132         this.writer = new Writer();
   116         this.engine = engine;
   133         this.engine = engine;
   117         this.exec = exec;
   134         this.exec = exec;
   118         this.handshakeState = new AtomicInteger(NOT_HANDSHAKING);
   135         this.handshakeState = new AtomicInteger(NOT_HANDSHAKING);
   210      *     OK: return generated buffers.
   227      *     OK: return generated buffers.
   211      *
   228      *
   212      * Upstream subscription strategy is to try and keep no more than
   229      * Upstream subscription strategy is to try and keep no more than
   213      * TARGET_BUFSIZE bytes in readBuf
   230      * TARGET_BUFSIZE bytes in readBuf
   214      */
   231      */
   215     class Reader extends SubscriberWrapper {
   232     class Reader extends SubscriberWrapper implements FlowTube.TubeSubscriber {
   216         // Maximum record size is 16k.
   233         // Maximum record size is 16k.
   217         // Because SocketTube can feeds us up to 3 16K buffers,
   234         // Because SocketTube can feeds us up to 3 16K buffers,
   218         // then setting this size to 16K means that the readBuf
   235         // then setting this size to 16K means that the readBuf
   219         // can store up to 64K-1 (16K-1 + 3*16K)
   236         // can store up to 64K-1 (16K-1 + 3*16K)
   220         static final int TARGET_BUFSIZE = 16 * 1024;
   237         static final int TARGET_BUFSIZE = 16 * 1024;
   233             super();
   250             super();
   234             scheduler = SequentialScheduler.synchronizedScheduler(
   251             scheduler = SequentialScheduler.synchronizedScheduler(
   235                                                 new ReaderDownstreamPusher());
   252                                                 new ReaderDownstreamPusher());
   236             this.readBuf = ByteBuffer.allocate(1024);
   253             this.readBuf = ByteBuffer.allocate(1024);
   237             readBuf.limit(0); // keep in read mode
   254             readBuf.limit(0); // keep in read mode
       
   255         }
       
   256 
       
   257         @Override
       
   258         public boolean supportsRecycling() {
       
   259             return recycler != null;
   238         }
   260         }
   239 
   261 
   240         protected SchedulingAction enterScheduling() {
   262         protected SchedulingAction enterScheduling() {
   241             return enterReadScheduling();
   263             return enterReadScheduling();
   242         }
   264         }
   290                     readBuf.compact();
   312                     readBuf.compact();
   291                     while (readBuf.remaining() < buf.remaining())
   313                     while (readBuf.remaining() < buf.remaining())
   292                         reallocReadBuf();
   314                         reallocReadBuf();
   293                     readBuf.put(buf);
   315                     readBuf.put(buf);
   294                     readBuf.flip();
   316                     readBuf.flip();
       
   317                     // should be safe to call inside lock
       
   318                     // since the only implementation
       
   319                     // offers the buffer to an unbounded queue.
       
   320                     // WARNING: do not touch buf after this point!
       
   321                     if (recycler != null) recycler.accept(buf);
   295                 }
   322                 }
   296                 if (complete) {
   323                 if (complete) {
   297                     this.completing = complete;
   324                     this.completing = complete;
   298                 }
   325                 }
   299             }
   326             }