src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2ClientImpl.java
branchhttp-client-branch
changeset 55763 634d8e14c172
parent 47216 71c04702a3d5
child 55764 34d7cc00f87a
equal deleted inserted replaced
55762:e947a3a50a95 55763:634d8e14c172
     1 /*
     1 /*
     2  * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
     2  * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     4  *
     4  *
     5  * This code is free software; you can redistribute it and/or modify it
     5  * This code is free software; you can redistribute it and/or modify it
     6  * under the terms of the GNU General Public License version 2 only, as
     6  * under the terms of the GNU General Public License version 2 only, as
     7  * published by the Free Software Foundation.  Oracle designates this
     7  * published by the Free Software Foundation.  Oracle designates this
    23  * questions.
    23  * questions.
    24  */
    24  */
    25 
    25 
    26 package jdk.incubator.http;
    26 package jdk.incubator.http;
    27 
    27 
    28 import java.io.IOException;
    28 import java.lang.System.Logger.Level;
    29 import java.net.InetSocketAddress;
    29 import java.net.InetSocketAddress;
    30 import java.net.URI;
    30 import java.net.URI;
    31 import java.util.Base64;
    31 import java.util.Base64;
    32 import java.util.Collections;
    32 import java.util.Collections;
       
    33 import java.util.HashSet;
    33 import java.util.HashMap;
    34 import java.util.HashMap;
    34 import java.util.HashSet;
       
    35 import java.util.Map;
    35 import java.util.Map;
    36 import java.util.Set;
    36 import java.util.Set;
    37 import java.util.concurrent.ConcurrentHashMap;
    37 import java.util.concurrent.ConcurrentHashMap;
    38 
    38 import java.util.concurrent.CompletableFuture;
       
    39 import jdk.incubator.http.internal.common.MinimalFuture;
    39 import jdk.incubator.http.internal.common.Utils;
    40 import jdk.incubator.http.internal.common.Utils;
    40 import jdk.incubator.http.internal.frame.SettingsFrame;
    41 import jdk.incubator.http.internal.frame.SettingsFrame;
    41 import static jdk.incubator.http.internal.frame.SettingsFrame.INITIAL_WINDOW_SIZE;
    42 import static jdk.incubator.http.internal.frame.SettingsFrame.INITIAL_WINDOW_SIZE;
    42 import static jdk.incubator.http.internal.frame.SettingsFrame.ENABLE_PUSH;
    43 import static jdk.incubator.http.internal.frame.SettingsFrame.ENABLE_PUSH;
    43 import static jdk.incubator.http.internal.frame.SettingsFrame.HEADER_TABLE_SIZE;
    44 import static jdk.incubator.http.internal.frame.SettingsFrame.HEADER_TABLE_SIZE;
    47 /**
    48 /**
    48  *  Http2 specific aspects of HttpClientImpl
    49  *  Http2 specific aspects of HttpClientImpl
    49  */
    50  */
    50 class Http2ClientImpl {
    51 class Http2ClientImpl {
    51 
    52 
       
    53     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
       
    54     final static System.Logger debug =
       
    55             Utils.getDebugLogger("Http2ClientImpl"::toString, DEBUG);
       
    56 
    52     private final HttpClientImpl client;
    57     private final HttpClientImpl client;
    53 
    58 
    54     Http2ClientImpl(HttpClientImpl client) {
    59     Http2ClientImpl(HttpClientImpl client) {
    55         this.client = client;
    60         this.client = client;
    56     }
    61     }
    57 
    62 
    58     /* Map key is "scheme:host:port" */
    63     /* Map key is "scheme:host:port" */
    59     private final Map<String,Http2Connection> connections = new ConcurrentHashMap<>();
    64     private final Map<String,Http2Connection> connections = new ConcurrentHashMap<>();
    60 
    65 
    61     private final Set<String> opening = Collections.synchronizedSet(new HashSet<>());
    66     private final Set<String> opening = Collections.synchronizedSet(new HashSet<>());
       
    67     private final Map<String,Set<CompletableFuture<Http2Connection>>> waiting =
       
    68     Collections.synchronizedMap(new HashMap<>());
       
    69 
       
    70     private void addToWaiting(String key, CompletableFuture<Http2Connection> cf) {
       
    71         synchronized (waiting) {
       
    72             Set<CompletableFuture<Http2Connection>> waiters = waiting.get(key);
       
    73             if (waiters == null) {
       
    74                 waiters = new HashSet<>();
       
    75                 waiting.put(key, waiters);
       
    76             }
       
    77             waiters.add(cf);
       
    78         }
       
    79     }
    62 
    80 
    63     boolean haveConnectionFor(URI uri, InetSocketAddress proxy) {
    81     boolean haveConnectionFor(URI uri, InetSocketAddress proxy) {
    64         return connections.containsKey(Http2Connection.keyFor(uri,proxy));
    82         return connections.containsKey(Http2Connection.keyFor(uri,proxy));
    65     }
    83     }
    66 
    84 
    67     /**
    85     /**
    68      * If a https request then blocks and waits until a connection is opened.
    86      * If a https request then async waits until a connection is opened.
    69      * Returns null if the request is 'http' as a different (upgrade)
    87      * Returns null if the request is 'http' as a different (upgrade)
    70      * mechanism is used.
    88      * mechanism is used.
    71      *
    89      *
    72      * Only one connection per destination is created. Blocks when opening
    90      * Only one connection per destination is created. Blocks when opening
    73      * connection, or when waiting for connection to be opened.
    91      * connection, or when waiting for connection to be opened.
    76      * If the request is secure (https) then we open the connection here.
    94      * If the request is secure (https) then we open the connection here.
    77      * If not, then the more complicated upgrade from 1.1 to 2 happens (not here)
    95      * If not, then the more complicated upgrade from 1.1 to 2 happens (not here)
    78      * In latter case, when the Http2Connection is connected, putConnection() must
    96      * In latter case, when the Http2Connection is connected, putConnection() must
    79      * be called to store it.
    97      * be called to store it.
    80      */
    98      */
    81     Http2Connection getConnectionFor(HttpRequestImpl req)
    99     CompletableFuture<Http2Connection> getConnectionFor(HttpRequestImpl req) {
    82             throws IOException, InterruptedException {
       
    83         URI uri = req.uri();
   100         URI uri = req.uri();
    84         InetSocketAddress proxy = req.proxy(client);
   101         InetSocketAddress proxy = req.proxy(client);
    85         String key = Http2Connection.keyFor(uri, proxy);
   102         String key = Http2Connection.keyFor(uri, proxy);
    86         Http2Connection connection = connections.get(key);
   103 
    87         if (connection != null) { // fast path if connection already exists
   104         synchronized (opening) {
    88             return connection;
   105             Http2Connection connection = connections.get(key);
       
   106             if (connection != null) { // fast path if connection already exists
       
   107                 return CompletableFuture.completedFuture(connection);
       
   108             }
       
   109 
       
   110             if (!req.secure()) {
       
   111                 return MinimalFuture.completedFuture(null);
       
   112             }
       
   113 
       
   114             if (!opening.contains(key)) {
       
   115                 debug.log(Level.DEBUG, "Opening: %s", key);
       
   116                 opening.add(key);
       
   117             } else {
       
   118                 CompletableFuture<Http2Connection> cf = new MinimalFuture<>();
       
   119                 addToWaiting(key, cf);
       
   120                 return cf;
       
   121             }
    89         }
   122         }
    90         synchronized (opening) {
   123         return Http2Connection
    91             while ((connection = connections.get(key)) == null) {
   124                 .createAsync(req, this)
    92                 if (!req.secure()) {
   125                 .whenComplete((conn, t) -> {
    93                     return null;
   126                     debug.log(Level.DEBUG,
    94                 }
   127                             "waking up dependents with created connection");
    95                 if (!opening.contains(key)) {
   128                     synchronized (opening) {
    96                     opening.add(key);
   129                         Set<CompletableFuture<Http2Connection>> waiters = waiting.remove(key);
    97                     break;
   130                         debug.log(Level.DEBUG, "Opening completed: %s", key);
    98                 } else {
   131                         opening.remove(key);
    99                     opening.wait();
   132                         final Throwable cause = Utils.getCompletionCause(t);
   100                 }
   133                         if (waiters == null) {
   101             }
   134                             debug.log(Level.DEBUG, "no dependent to wake up");
   102         }
   135                             return;
   103         if (connection != null) {
   136                         } else if (cause instanceof Http2Connection.ALPNException) {
   104             return connection;
   137                             waiters.forEach((cf1) -> cf1.completeAsync(() -> null,
   105         }
   138                                     client.theExecutor()));
   106         // we are opening the connection here blocking until it is done.
   139                         } else if (cause != null) {
   107         try {
   140                             debug.log(Level.DEBUG,
   108             connection = new Http2Connection(req, this);
   141                                     () -> "waking up dependants: failed: " + cause);
   109         } catch (Throwable t) {
   142                             waiters.forEach((cf1) -> cf1.completeExceptionally(cause));
   110             synchronized (opening) {
   143                         } else  {
   111                 opening.remove(key);
   144                             debug.log(Level.DEBUG, "waking up dependants: succeeded");
   112                 opening.notifyAll();
   145                             waiters.forEach((cf1) -> cf1.completeAsync(() -> conn,
   113             }
   146                                     client.theExecutor()));
   114             throw t;
   147                         }
   115         }
   148                     }
   116         synchronized (opening) {
   149                 });
   117             connections.put(key, connection);
   150     }
   118             opening.remove(key);
       
   119             opening.notifyAll();
       
   120         }
       
   121         return connection;
       
   122     }
       
   123 
       
   124 
   151 
   125     /*
   152     /*
   126      * TODO: If there isn't a connection to the same destination, then
   153      * TODO: If there isn't a connection to the same destination, then
   127      * store it. If there is already a connection, then close it
   154      * store it. If there is already a connection, then close it
   128      */
   155      */
   130         connections.put(c.key(), c);
   157         connections.put(c.key(), c);
   131     }
   158     }
   132 
   159 
   133     void deleteConnection(Http2Connection c) {
   160     void deleteConnection(Http2Connection c) {
   134         connections.remove(c.key());
   161         connections.remove(c.key());
       
   162     }
       
   163 
       
   164     void stop() {
       
   165         debug.log(Level.DEBUG, "stopping");
       
   166         connections.values().stream().forEach(this::close);
       
   167         connections.clear();
       
   168     }
       
   169 
       
   170     private void close(Http2Connection h2c) {
       
   171         try { h2c.close(); } catch (Throwable t) {}
   135     }
   172     }
   136 
   173 
   137     HttpClientImpl client() {
   174     HttpClientImpl client() {
   138         return client;
   175         return client;
   139     }
   176     }