36 import java.nio.ByteBuffer; |
36 import java.nio.ByteBuffer; |
37 import java.nio.charset.StandardCharsets; |
37 import java.nio.charset.StandardCharsets; |
38 import java.util.*; |
38 import java.util.*; |
39 import java.util.concurrent.CompletableFuture; |
39 import java.util.concurrent.CompletableFuture; |
40 import java.util.concurrent.ExecutorService; |
40 import java.util.concurrent.ExecutorService; |
41 import java.util.concurrent.atomic.AtomicLong; |
41 import java.util.concurrent.ConcurrentLinkedQueue; |
42 import java.util.function.Consumer; |
42 import java.util.function.Consumer; |
43 import jdk.incubator.http.internal.common.HttpHeadersImpl; |
43 import jdk.incubator.http.internal.common.HttpHeadersImpl; |
44 import jdk.incubator.http.internal.frame.*; |
44 import jdk.incubator.http.internal.frame.*; |
45 import jdk.incubator.http.internal.hpack.Decoder; |
45 import jdk.incubator.http.internal.hpack.Decoder; |
46 import jdk.incubator.http.internal.hpack.DecodingCallback; |
46 import jdk.incubator.http.internal.hpack.DecodingCallback; |
71 final SettingsFrame serverSettings; |
71 final SettingsFrame serverSettings; |
72 final ExecutorService exec; |
72 final ExecutorService exec; |
73 final boolean secure; |
73 final boolean secure; |
74 volatile boolean stopping; |
74 volatile boolean stopping; |
75 volatile int nextPushStreamId = 2; |
75 volatile int nextPushStreamId = 2; |
76 volatile byte[] pingData; |
76 ConcurrentLinkedQueue<PingRequest> pings = new ConcurrentLinkedQueue<>(); |
77 volatile CompletableFuture<Long> pingResponseHandler; |
|
78 final AtomicLong pingStamp; // milliseconds at time PING was sent |
|
79 |
77 |
80 final static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0); |
78 final static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0); |
81 final static byte[] EMPTY_BARRAY = new byte[0]; |
79 final static byte[] EMPTY_BARRAY = new byte[0]; |
82 final Random random; |
80 final Random random; |
83 |
81 |
84 final static byte[] clientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes(); |
82 final static byte[] clientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes(); |
85 |
83 |
86 static class Sentinel extends Http2Frame { |
84 static class Sentinel extends Http2Frame { |
87 Sentinel() { super(-1,-1);} |
85 Sentinel() { super(-1,-1);} |
|
86 } |
|
87 |
|
88 class PingRequest { |
|
89 final byte[] pingData; |
|
90 final long pingStamp; |
|
91 final CompletableFuture<Long> response; |
|
92 |
|
93 PingRequest() { |
|
94 pingData = new byte[8]; |
|
95 random.nextBytes(pingData); |
|
96 pingStamp = System.currentTimeMillis(); |
|
97 response = new CompletableFuture<>(); |
|
98 } |
|
99 |
|
100 PingFrame frame() { |
|
101 return new PingFrame(0, pingData); |
|
102 } |
|
103 |
|
104 CompletableFuture<Long> response() { |
|
105 return response; |
|
106 } |
|
107 |
|
108 void success() { |
|
109 response.complete(System.currentTimeMillis() - pingStamp); |
|
110 } |
|
111 |
|
112 void fail(Throwable t) { |
|
113 response.completeExceptionally(t); |
|
114 } |
88 } |
115 } |
89 |
116 |
90 static Sentinel sentinel; |
117 static Sentinel sentinel; |
91 |
118 |
92 Http2TestServerConnection(Http2TestServer server, |
119 Http2TestServerConnection(Http2TestServer server, |
108 this.socket.setTcpNoDelay(true); |
135 this.socket.setTcpNoDelay(true); |
109 this.serverSettings = SettingsFrame.getDefaultSettings(); |
136 this.serverSettings = SettingsFrame.getDefaultSettings(); |
110 this.exec = server.exec; |
137 this.exec = server.exec; |
111 this.secure = server.secure; |
138 this.secure = server.secure; |
112 this.pushStreams = new HashSet<>(); |
139 this.pushStreams = new HashSet<>(); |
113 this.pingStamp = new AtomicLong(); |
|
114 is = new BufferedInputStream(socket.getInputStream()); |
140 is = new BufferedInputStream(socket.getInputStream()); |
115 os = new BufferedOutputStream(socket.getOutputStream()); |
141 os = new BufferedOutputStream(socket.getOutputStream()); |
116 } |
142 } |
117 |
143 |
118 /** |
144 /** |
119 * Sends a PING frame on this connection, and completes the returned |
145 * Sends a PING frame on this connection, and completes the returned |
120 * CF when the PING ack is received. The CF is given |
146 * CF when the PING ack is received. The CF is given |
121 * an integer, whose value is the number of milliseconds |
147 * an integer, whose value is the number of milliseconds |
122 * between PING and ACK. |
148 * between PING and ACK. |
123 * |
|
124 * Only one PING is allowed to be outstanding at any time |
|
125 */ |
149 */ |
126 CompletableFuture<Long> sendPing() { |
150 CompletableFuture<Long> sendPing() { |
127 this.pingResponseHandler = new CompletableFuture<>(); |
151 PingRequest ping = null; |
128 try { |
152 try { |
129 if (pingData != null) { |
153 ping = new PingRequest(); |
130 throw new IllegalStateException("PING already outstanding"); |
154 pings.add(ping); |
131 } |
155 outputQ.put(ping.frame()); |
132 pingData = new byte[8]; |
|
133 random.nextBytes(pingData); |
|
134 pingStamp.set(System.currentTimeMillis()); |
|
135 PingFrame ping = new PingFrame(0, pingData); |
|
136 outputQ.put(ping); |
|
137 } catch (Throwable t) { |
156 } catch (Throwable t) { |
138 pingResponseHandler.completeExceptionally(t); |
157 ping.fail(t); |
139 } |
158 } |
140 return pingResponseHandler; |
159 return ping.response(); |
|
160 } |
|
161 |
|
162 /** |
|
163 * Returns the first PingRequest from Queue |
|
164 */ |
|
165 private PingRequest getNextRequest() { |
|
166 return pings.poll(); |
141 } |
167 } |
142 |
168 |
143 /** |
169 /** |
144 * Handles incoming Ping, which could be an ack |
170 * Handles incoming Ping, which could be an ack |
145 * or a client originated Ping |
171 * or a client originated Ping |
150 close(); |
176 close(); |
151 return; |
177 return; |
152 } |
178 } |
153 if (ping.getFlag(PingFrame.ACK)) { |
179 if (ping.getFlag(PingFrame.ACK)) { |
154 // did we send a Ping? |
180 // did we send a Ping? |
155 if (pingData == null) { |
181 PingRequest request = getNextRequest(); |
156 System.err.println("Invalid ping received"); |
182 if (request == null) { |
|
183 System.err.println("Invalid ping ACK received"); |
157 close(); |
184 close(); |
158 return; |
185 return; |
159 } else if (!Arrays.equals(pingData, ping.getData())) { |
186 } else if (!Arrays.equals(request.pingData, ping.getData())) { |
160 pingResponseHandler.completeExceptionally(new RuntimeException("Wrong ping data in ACK")); |
187 request.fail(new RuntimeException("Wrong ping data in ACK")); |
161 } else { |
188 } else { |
162 pingResponseHandler.complete(System.currentTimeMillis() - pingStamp.getAndSet(0)); |
189 request.success(); |
163 } |
190 } |
164 pingResponseHandler = null; |
|
165 pingData = null; |
|
166 } else { |
191 } else { |
167 // client originated PING. Just send it back with ACK set |
192 // client originated PING. Just send it back with ACK set |
168 ping.setFlag(PingFrame.ACK); |
193 ping.setFlag(PingFrame.ACK); |
169 outputQ.put(ping); |
194 outputQ.put(ping); |
170 } |
195 } |