25 package jdk.incubator.http.internal.common; |
25 package jdk.incubator.http.internal.common; |
26 |
26 |
27 |
27 |
28 import java.io.Closeable; |
28 import java.io.Closeable; |
29 import java.io.IOException; |
29 import java.io.IOException; |
30 import java.nio.ByteBuffer; |
|
31 import java.util.ArrayList; |
30 import java.util.ArrayList; |
32 import java.util.Arrays; |
31 import java.util.Arrays; |
33 import java.util.Deque; |
32 import java.util.Deque; |
34 import java.util.List; |
33 import java.util.List; |
35 import java.util.concurrent.ConcurrentLinkedDeque; |
34 import java.util.concurrent.ConcurrentLinkedDeque; |
36 import java.util.concurrent.atomic.AtomicInteger; |
35 import java.util.concurrent.atomic.AtomicInteger; |
37 import java.util.function.BiConsumer; |
|
38 |
36 |
39 public class AsyncWriteQueue implements Closeable { |
37 public class AsyncWriteQueue implements Closeable { |
|
38 |
|
39 @FunctionalInterface |
|
40 public static interface AsyncConsumer { |
|
41 /** |
|
42 * Takes an array of buffer reference and attempt to send the data |
|
43 * downstream. If not all the data can be sent, then push back |
|
44 * to the source queue by calling {@code source.setDelayed(buffers)} |
|
45 * and return false. If all the data was successfully sent downstream |
|
46 * then returns true. |
|
47 * @param buffers An array of ButeBufferReference containing data |
|
48 * to send downstream. |
|
49 * @param source This AsyncWriteQueue. |
|
50 * @return true if all the data could be sent downstream, false otherwise. |
|
51 */ |
|
52 boolean trySend(ByteBufferReference[] buffers, AsyncWriteQueue source); |
|
53 } |
40 |
54 |
41 private static final int IDLE = 0; // nobody is flushing from the queue |
55 private static final int IDLE = 0; // nobody is flushing from the queue |
42 private static final int FLUSHING = 1; // there is the only thread flushing from the queue |
56 private static final int FLUSHING = 1; // there is the only thread flushing from the queue |
43 private static final int REFLUSHING = 2; // while one thread was flushing from the queue |
57 private static final int REFLUSHING = 2; // while one thread was flushing from the queue |
44 // the other thread put data into the queue. |
58 // the other thread put data into the queue. |
49 |
63 |
50 private static final int CLOSED = 4; // queue is closed |
64 private static final int CLOSED = 4; // queue is closed |
51 |
65 |
52 private final AtomicInteger state = new AtomicInteger(IDLE); |
66 private final AtomicInteger state = new AtomicInteger(IDLE); |
53 private final Deque<ByteBufferReference[]> queue = new ConcurrentLinkedDeque<>(); |
67 private final Deque<ByteBufferReference[]> queue = new ConcurrentLinkedDeque<>(); |
54 private final BiConsumer<ByteBufferReference[], AsyncWriteQueue> consumeAction; |
68 private final AsyncConsumer consumeAction; |
55 |
69 |
56 // Queue may be processed in two modes: |
70 // Queue may be processed in two modes: |
57 // 1. if(!doFullDrain) - invoke callback on each chunk |
71 // 1. if(!doFullDrain) - invoke callback on each chunk |
58 // 2. if(doFullDrain) - drain the whole queue, merge all chunks into the single array and invoke callback |
72 // 2. if(doFullDrain) - drain the whole queue, merge all chunks into the single array and invoke callback |
59 private final boolean doFullDrain; |
73 private final boolean doFullDrain; |
60 |
74 |
61 private ByteBufferReference[] delayedElement = null; |
75 private ByteBufferReference[] delayedElement = null; |
62 |
76 |
63 public AsyncWriteQueue(BiConsumer<ByteBufferReference[], AsyncWriteQueue> consumeAction) { |
77 public AsyncWriteQueue(AsyncConsumer consumeAction) { |
64 this(consumeAction, true); |
78 this(consumeAction, true); |
65 } |
79 } |
66 |
80 |
67 public AsyncWriteQueue(BiConsumer<ByteBufferReference[], AsyncWriteQueue> consumeAction, boolean doFullDrain) { |
81 public AsyncWriteQueue(AsyncConsumer consumeAction, boolean doFullDrain) { |
68 this.consumeAction = consumeAction; |
82 this.consumeAction = consumeAction; |
69 this.doFullDrain = doFullDrain; |
83 this.doFullDrain = doFullDrain; |
70 } |
84 } |
71 |
85 |
72 public void put(ByteBufferReference[] e) throws IOException { |
86 public void put(ByteBufferReference[] e) throws IOException { |