jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/AsyncWriteQueue.java
changeset 46836 2684bef98033
parent 42460 7133f144981a
equal deleted inserted replaced
46188:4586bc5d28d1 46836:2684bef98033
    25 package jdk.incubator.http.internal.common;
    25 package jdk.incubator.http.internal.common;
    26 
    26 
    27 
    27 
    28 import java.io.Closeable;
    28 import java.io.Closeable;
    29 import java.io.IOException;
    29 import java.io.IOException;
    30 import java.nio.ByteBuffer;
       
    31 import java.util.ArrayList;
    30 import java.util.ArrayList;
    32 import java.util.Arrays;
    31 import java.util.Arrays;
    33 import java.util.Deque;
    32 import java.util.Deque;
    34 import java.util.List;
    33 import java.util.List;
    35 import java.util.concurrent.ConcurrentLinkedDeque;
    34 import java.util.concurrent.ConcurrentLinkedDeque;
    36 import java.util.concurrent.atomic.AtomicInteger;
    35 import java.util.concurrent.atomic.AtomicInteger;
    37 import java.util.function.BiConsumer;
       
    38 
    36 
    39 public class AsyncWriteQueue implements Closeable {
    37 public class AsyncWriteQueue implements Closeable {
       
    38 
       
    39     @FunctionalInterface
       
    40     public static interface AsyncConsumer {
       
    41         /**
       
    42          * Takes an array of buffer reference and attempt to send the data
       
    43          * downstream. If not all the data can be sent, then push back
       
    44          * to the source queue by calling {@code source.setDelayed(buffers)}
       
    45          * and return false. If all the data was successfully sent downstream
       
    46          * then returns true.
       
    47          * @param buffers An array of ButeBufferReference containing data
       
    48          *                to send downstream.
       
    49          * @param source This AsyncWriteQueue.
       
    50          * @return true if all the data could be sent downstream, false otherwise.
       
    51          */
       
    52         boolean trySend(ByteBufferReference[] buffers, AsyncWriteQueue source);
       
    53     }
    40 
    54 
    41     private static final int IDLE    = 0;     // nobody is flushing from the queue
    55     private static final int IDLE    = 0;     // nobody is flushing from the queue
    42     private static final int FLUSHING = 1;    // there is the only thread flushing from the queue
    56     private static final int FLUSHING = 1;    // there is the only thread flushing from the queue
    43     private static final int REFLUSHING = 2;  // while one thread was flushing from the queue
    57     private static final int REFLUSHING = 2;  // while one thread was flushing from the queue
    44                                               // the other thread put data into the queue.
    58                                               // the other thread put data into the queue.
    49 
    63 
    50     private static final int CLOSED = 4;      // queue is closed
    64     private static final int CLOSED = 4;      // queue is closed
    51 
    65 
    52     private final AtomicInteger state = new AtomicInteger(IDLE);
    66     private final AtomicInteger state = new AtomicInteger(IDLE);
    53     private final Deque<ByteBufferReference[]> queue = new ConcurrentLinkedDeque<>();
    67     private final Deque<ByteBufferReference[]> queue = new ConcurrentLinkedDeque<>();
    54     private final BiConsumer<ByteBufferReference[], AsyncWriteQueue> consumeAction;
    68     private final AsyncConsumer consumeAction;
    55 
    69 
    56     // Queue may be processed in two modes:
    70     // Queue may be processed in two modes:
    57     // 1. if(!doFullDrain) - invoke callback on each chunk
    71     // 1. if(!doFullDrain) - invoke callback on each chunk
    58     // 2. if(doFullDrain)  - drain the whole queue, merge all chunks into the single array and invoke callback
    72     // 2. if(doFullDrain)  - drain the whole queue, merge all chunks into the single array and invoke callback
    59     private final boolean doFullDrain;
    73     private final boolean doFullDrain;
    60 
    74 
    61     private ByteBufferReference[] delayedElement = null;
    75     private ByteBufferReference[] delayedElement = null;
    62 
    76 
    63     public AsyncWriteQueue(BiConsumer<ByteBufferReference[], AsyncWriteQueue> consumeAction) {
    77     public AsyncWriteQueue(AsyncConsumer consumeAction) {
    64         this(consumeAction, true);
    78         this(consumeAction, true);
    65     }
    79     }
    66 
    80 
    67     public AsyncWriteQueue(BiConsumer<ByteBufferReference[], AsyncWriteQueue> consumeAction, boolean doFullDrain) {
    81     public AsyncWriteQueue(AsyncConsumer consumeAction, boolean doFullDrain) {
    68         this.consumeAction = consumeAction;
    82         this.consumeAction = consumeAction;
    69         this.doFullDrain = doFullDrain;
    83         this.doFullDrain = doFullDrain;
    70     }
    84     }
    71 
    85 
    72     public void put(ByteBufferReference[] e) throws IOException {
    86     public void put(ByteBufferReference[] e) throws IOException {
   154         } else {
   168         } else {
   155             element = drain();
   169             element = drain();
   156         }
   170         }
   157         while(true) {
   171         while(true) {
   158             while (element != null) {
   172             while (element != null) {
   159                 consumeAction.accept(element, this);
   173                 if (!consumeAction.trySend(element, this)) {
   160                 if (state.get() == DELAYED) {
       
   161                     return;
   174                     return;
   162                 }
   175                 }
   163                 element = drain();
   176                 element = drain();
   164             }
   177             }
   165             switch (state.get()) {
   178             switch (state.get()) {