test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java
branchhttp-client-branch
changeset 55912 dfa9489d1cb1
parent 55871 45c88bf0592b
child 55941 2d423c9b73bb
equal deleted inserted replaced
55911:d3298b9365e7 55912:dfa9489d1cb1
    36 import java.nio.ByteBuffer;
    36 import java.nio.ByteBuffer;
    37 import java.nio.charset.StandardCharsets;
    37 import java.nio.charset.StandardCharsets;
    38 import java.util.*;
    38 import java.util.*;
    39 import java.util.concurrent.CompletableFuture;
    39 import java.util.concurrent.CompletableFuture;
    40 import java.util.concurrent.ExecutorService;
    40 import java.util.concurrent.ExecutorService;
    41 import java.util.concurrent.atomic.AtomicLong;
    41 import java.util.concurrent.ConcurrentLinkedQueue;
    42 import java.util.function.Consumer;
    42 import java.util.function.Consumer;
    43 import jdk.incubator.http.internal.common.HttpHeadersImpl;
    43 import jdk.incubator.http.internal.common.HttpHeadersImpl;
    44 import jdk.incubator.http.internal.frame.*;
    44 import jdk.incubator.http.internal.frame.*;
    45 import jdk.incubator.http.internal.hpack.Decoder;
    45 import jdk.incubator.http.internal.hpack.Decoder;
    46 import jdk.incubator.http.internal.hpack.DecodingCallback;
    46 import jdk.incubator.http.internal.hpack.DecodingCallback;
    71     final SettingsFrame serverSettings;
    71     final SettingsFrame serverSettings;
    72     final ExecutorService exec;
    72     final ExecutorService exec;
    73     final boolean secure;
    73     final boolean secure;
    74     volatile boolean stopping;
    74     volatile boolean stopping;
    75     volatile int nextPushStreamId = 2;
    75     volatile int nextPushStreamId = 2;
    76     volatile byte[] pingData;
    76     ConcurrentLinkedQueue<PingRequest> pings = new ConcurrentLinkedQueue<>();
    77     volatile CompletableFuture<Long> pingResponseHandler;
       
    78     final AtomicLong pingStamp; // milliseconds at time PING was sent
       
    79 
    77 
    80     final static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
    78     final static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
    81     final static byte[] EMPTY_BARRAY = new byte[0];
    79     final static byte[] EMPTY_BARRAY = new byte[0];
    82     final Random random;
    80     final Random random;
    83 
    81 
    84     final static byte[] clientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes();
    82     final static byte[] clientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes();
    85 
    83 
    86     static class Sentinel extends Http2Frame {
    84     static class Sentinel extends Http2Frame {
    87         Sentinel() { super(-1,-1);}
    85         Sentinel() { super(-1,-1);}
       
    86     }
       
    87 
       
    88     class PingRequest {
       
    89         final byte[] pingData;
       
    90         final long pingStamp;
       
    91         final CompletableFuture<Long> response;
       
    92 
       
    93         PingRequest() {
       
    94             pingData = new byte[8];
       
    95             random.nextBytes(pingData);
       
    96             pingStamp = System.currentTimeMillis();
       
    97             response = new CompletableFuture<>();
       
    98         }
       
    99 
       
   100         PingFrame frame() {
       
   101             return new PingFrame(0, pingData);
       
   102         }
       
   103 
       
   104         CompletableFuture<Long> response() {
       
   105             return response;
       
   106         }
       
   107 
       
   108         void success() {
       
   109             response.complete(System.currentTimeMillis() - pingStamp);
       
   110         }
       
   111 
       
   112         void fail(Throwable t) {
       
   113             response.completeExceptionally(t);
       
   114         }
    88     }
   115     }
    89 
   116 
    90     static Sentinel sentinel;
   117     static Sentinel sentinel;
    91 
   118 
    92     Http2TestServerConnection(Http2TestServer server,
   119     Http2TestServerConnection(Http2TestServer server,
   108         this.socket.setTcpNoDelay(true);
   135         this.socket.setTcpNoDelay(true);
   109         this.serverSettings = SettingsFrame.getDefaultSettings();
   136         this.serverSettings = SettingsFrame.getDefaultSettings();
   110         this.exec = server.exec;
   137         this.exec = server.exec;
   111         this.secure = server.secure;
   138         this.secure = server.secure;
   112         this.pushStreams = new HashSet<>();
   139         this.pushStreams = new HashSet<>();
   113         this.pingStamp = new AtomicLong();
       
   114         is = new BufferedInputStream(socket.getInputStream());
   140         is = new BufferedInputStream(socket.getInputStream());
   115         os = new BufferedOutputStream(socket.getOutputStream());
   141         os = new BufferedOutputStream(socket.getOutputStream());
   116     }
   142     }
   117 
   143 
   118     /**
   144     /**
   119      * Sends a PING frame on this connection, and completes the returned
   145      * Sends a PING frame on this connection, and completes the returned
   120      * CF when the PING ack is received. The CF is given
   146      * CF when the PING ack is received. The CF is given
   121      * an integer, whose value is the number of milliseconds
   147      * an integer, whose value is the number of milliseconds
   122      * between PING and ACK.
   148      * between PING and ACK.
   123      *
       
   124      * Only one PING is allowed to be outstanding at any time
       
   125      */
   149      */
   126     CompletableFuture<Long> sendPing() {
   150     CompletableFuture<Long> sendPing() {
   127         this.pingResponseHandler = new CompletableFuture<>();
   151         PingRequest ping = null;
   128         try {
   152         try {
   129             if (pingData != null) {
   153             ping = new PingRequest();
   130                 throw new IllegalStateException("PING already outstanding");
   154             pings.add(ping);
   131             }
   155             outputQ.put(ping.frame());
   132             pingData = new byte[8];
       
   133             random.nextBytes(pingData);
       
   134             pingStamp.set(System.currentTimeMillis());
       
   135             PingFrame ping = new PingFrame(0, pingData);
       
   136             outputQ.put(ping);
       
   137         } catch (Throwable t) {
   156         } catch (Throwable t) {
   138             pingResponseHandler.completeExceptionally(t);
   157             ping.fail(t);
   139         }
   158         }
   140         return pingResponseHandler;
   159         return ping.response();
       
   160     }
       
   161 
       
   162     /**
       
   163      * Returns the first PingRequest from Queue
       
   164      */
       
   165     private PingRequest getNextRequest() {
       
   166         return pings.poll();
   141     }
   167     }
   142 
   168 
   143     /**
   169     /**
   144      * Handles incoming Ping, which could be an ack
   170      * Handles incoming Ping, which could be an ack
   145      * or a client originated Ping
   171      * or a client originated Ping
   150             close();
   176             close();
   151             return;
   177             return;
   152         }
   178         }
   153         if (ping.getFlag(PingFrame.ACK)) {
   179         if (ping.getFlag(PingFrame.ACK)) {
   154             // did we send a Ping?
   180             // did we send a Ping?
   155             if (pingData == null) {
   181             PingRequest request = getNextRequest();
   156                 System.err.println("Invalid ping received");
   182             if (request == null) {
       
   183                 System.err.println("Invalid ping ACK received");
   157                 close();
   184                 close();
   158                 return;
   185                 return;
   159             } else if (!Arrays.equals(pingData, ping.getData())) {
   186             } else if (!Arrays.equals(request.pingData, ping.getData())) {
   160                 pingResponseHandler.completeExceptionally(new RuntimeException("Wrong ping data in ACK"));
   187                 request.fail(new RuntimeException("Wrong ping data in ACK"));
   161             } else {
   188             } else {
   162                 pingResponseHandler.complete(System.currentTimeMillis() - pingStamp.getAndSet(0));
   189                 request.success();
   163             }
   190             }
   164             pingResponseHandler = null;
       
   165             pingData = null;
       
   166         } else {
   191         } else {
   167             // client originated PING. Just send it back with ACK set
   192             // client originated PING. Just send it back with ACK set
   168             ping.setFlag(PingFrame.ACK);
   193             ping.setFlag(PingFrame.ACK);
   169             outputQ.put(ping);
   194             outputQ.put(ping);
   170         }
   195         }