27 |
27 |
28 import java.io.IOException; |
28 import java.io.IOException; |
29 import java.nio.ByteBuffer; |
29 import java.nio.ByteBuffer; |
30 import java.nio.channels.SelectionKey; |
30 import java.nio.channels.SelectionKey; |
31 import java.util.concurrent.atomic.AtomicLong; |
31 import java.util.concurrent.atomic.AtomicLong; |
|
32 import jdk.incubator.http.internal.common.SequentialScheduler; |
32 |
33 |
33 /* |
34 /* |
34 * Receives incoming data from the channel on demand and converts it into a |
35 * Receives incoming data from the channel on demand and converts it into a |
35 * stream of WebSocket messages which are then delivered to the supplied message |
36 * stream of WebSocket messages which are then delivered to the supplied message |
36 * consumer in a strict sequential order and non-recursively. In other words, |
37 * consumer in a strict sequential order and non-recursively. In other words, |
55 private final RawChannel channel; |
56 private final RawChannel channel; |
56 private final FrameConsumer frameConsumer; |
57 private final FrameConsumer frameConsumer; |
57 private final Frame.Reader reader = new Frame.Reader(); |
58 private final Frame.Reader reader = new Frame.Reader(); |
58 private final RawChannel.RawEvent event = createHandler(); |
59 private final RawChannel.RawEvent event = createHandler(); |
59 private final AtomicLong demand = new AtomicLong(); |
60 private final AtomicLong demand = new AtomicLong(); |
60 private final CooperativeHandler handler; |
61 private final SequentialScheduler pushScheduler; |
61 |
62 |
62 private ByteBuffer data; |
63 private ByteBuffer data; |
63 private volatile int state; |
64 private volatile int state; |
64 |
65 |
65 private static final int UNREGISTERED = 0; |
66 private static final int UNREGISTERED = 0; |
72 this.frameConsumer = new FrameConsumer(this.messageConsumer); |
73 this.frameConsumer = new FrameConsumer(this.messageConsumer); |
73 this.data = channel.initialByteBuffer(); |
74 this.data = channel.initialByteBuffer(); |
74 // To ensure the initial non-final `data` will be visible |
75 // To ensure the initial non-final `data` will be visible |
75 // (happens-before) when `handler` invokes `pushContinuously` |
76 // (happens-before) when `handler` invokes `pushContinuously` |
76 // the following assignment is done last: |
77 // the following assignment is done last: |
77 handler = new CooperativeHandler(this::pushContinuously); |
78 pushScheduler = new SequentialScheduler(new PushContinuouslyTask()); |
78 } |
79 } |
79 |
80 |
80 private RawChannel.RawEvent createHandler() { |
81 private RawChannel.RawEvent createHandler() { |
81 return new RawChannel.RawEvent() { |
82 return new RawChannel.RawEvent() { |
82 |
83 |
86 } |
87 } |
87 |
88 |
88 @Override |
89 @Override |
89 public void handle() { |
90 public void handle() { |
90 state = AVAILABLE; |
91 state = AVAILABLE; |
91 handler.handle(); |
92 pushScheduler.runOrSchedule(); |
92 } |
93 } |
93 }; |
94 }; |
94 } |
95 } |
95 |
96 |
96 void request(long n) { |
97 void request(long n) { |
97 if (n < 0L) { |
98 if (n < 0L) { |
98 throw new IllegalArgumentException("Negative: " + n); |
99 throw new IllegalArgumentException("Negative: " + n); |
99 } |
100 } |
100 demand.accumulateAndGet(n, (p, i) -> p + i < 0 ? Long.MAX_VALUE : p + i); |
101 demand.accumulateAndGet(n, (p, i) -> p + i < 0 ? Long.MAX_VALUE : p + i); |
101 handler.handle(); |
102 pushScheduler.runOrSchedule(); |
102 } |
103 } |
103 |
104 |
104 void acknowledge() { |
105 void acknowledge() { |
105 long x = demand.decrementAndGet(); |
106 long x = demand.decrementAndGet(); |
106 if (x < 0) { |
107 if (x < 0) { |
111 /* |
112 /* |
112 * Stops the machinery from reading and delivering messages permanently, |
113 * Stops the machinery from reading and delivering messages permanently, |
113 * regardless of the current demand and data availability. |
114 * regardless of the current demand and data availability. |
114 */ |
115 */ |
115 void close() { |
116 void close() { |
116 handler.stop(); |
117 pushScheduler.stop(); |
117 } |
118 } |
118 |
119 |
119 private void pushContinuously() { |
120 private class PushContinuouslyTask |
120 while (!handler.isStopped()) { |
121 extends SequentialScheduler.CompleteRestartableTask |
121 if (data.hasRemaining()) { |
122 { |
122 if (demand.get() > 0) { |
123 @Override |
123 try { |
124 public void run() { |
124 int oldPos = data.position(); |
125 while (!pushScheduler.isStopped()) { |
125 reader.readFrame(data, frameConsumer); |
126 if (data.hasRemaining()) { |
126 int newPos = data.position(); |
127 if (demand.get() > 0) { |
127 assert oldPos != newPos : data; // reader always consumes bytes |
128 try { |
128 } catch (FailWebSocketException e) { |
129 int oldPos = data.position(); |
129 handler.stop(); |
130 reader.readFrame(data, frameConsumer); |
130 messageConsumer.onError(e); |
131 int newPos = data.position(); |
|
132 assert oldPos != newPos : data; // reader always consumes bytes |
|
133 } catch (FailWebSocketException e) { |
|
134 pushScheduler.stop(); |
|
135 messageConsumer.onError(e); |
|
136 } |
|
137 continue; |
131 } |
138 } |
132 continue; |
139 break; |
133 } |
140 } |
134 break; |
141 switch (state) { |
135 } |
142 case WAITING: |
136 switch (state) { |
|
137 case WAITING: |
|
138 return; |
|
139 case UNREGISTERED: |
|
140 try { |
|
141 state = WAITING; |
|
142 channel.registerEvent(event); |
|
143 } catch (IOException e) { |
|
144 handler.stop(); |
|
145 messageConsumer.onError(e); |
|
146 } |
|
147 return; |
|
148 case AVAILABLE: |
|
149 try { |
|
150 data = channel.read(); |
|
151 } catch (IOException e) { |
|
152 handler.stop(); |
|
153 messageConsumer.onError(e); |
|
154 return; |
143 return; |
155 } |
144 case UNREGISTERED: |
156 if (data == null) { // EOF |
145 try { |
157 handler.stop(); |
146 state = WAITING; |
158 messageConsumer.onComplete(); |
147 channel.registerEvent(event); |
|
148 } catch (IOException e) { |
|
149 pushScheduler.stop(); |
|
150 messageConsumer.onError(e); |
|
151 } |
159 return; |
152 return; |
160 } else if (!data.hasRemaining()) { // No data at the moment |
153 case AVAILABLE: |
161 // Pretty much a "goto", reusing the existing code path |
154 try { |
162 // for registration |
155 data = channel.read(); |
163 state = UNREGISTERED; |
156 } catch (IOException e) { |
164 } |
157 pushScheduler.stop(); |
165 continue; |
158 messageConsumer.onError(e); |
166 default: |
159 return; |
167 throw new InternalError(String.valueOf(state)); |
160 } |
|
161 if (data == null) { // EOF |
|
162 pushScheduler.stop(); |
|
163 messageConsumer.onComplete(); |
|
164 return; |
|
165 } else if (!data.hasRemaining()) { // No data at the moment |
|
166 // Pretty much a "goto", reusing the existing code path |
|
167 // for registration |
|
168 state = UNREGISTERED; |
|
169 } |
|
170 continue; |
|
171 default: |
|
172 throw new InternalError(String.valueOf(state)); |
|
173 } |
168 } |
174 } |
169 } |
175 } |
170 } |
176 } |
171 } |
177 } |
172 |
|