23 * questions. |
23 * questions. |
24 */ |
24 */ |
25 |
25 |
26 package jdk.incubator.http; |
26 package jdk.incubator.http; |
27 |
27 |
28 import java.io.IOException; |
28 import java.lang.System.Logger.Level; |
29 import java.net.InetSocketAddress; |
29 import java.net.InetSocketAddress; |
30 import java.net.URI; |
30 import java.net.URI; |
31 import java.util.Base64; |
31 import java.util.Base64; |
32 import java.util.Collections; |
32 import java.util.Collections; |
|
33 import java.util.HashSet; |
33 import java.util.HashMap; |
34 import java.util.HashMap; |
34 import java.util.HashSet; |
|
35 import java.util.Map; |
35 import java.util.Map; |
36 import java.util.Set; |
36 import java.util.Set; |
37 import java.util.concurrent.ConcurrentHashMap; |
37 import java.util.concurrent.ConcurrentHashMap; |
38 |
38 import java.util.concurrent.CompletableFuture; |
|
39 import jdk.incubator.http.internal.common.MinimalFuture; |
39 import jdk.incubator.http.internal.common.Utils; |
40 import jdk.incubator.http.internal.common.Utils; |
40 import jdk.incubator.http.internal.frame.SettingsFrame; |
41 import jdk.incubator.http.internal.frame.SettingsFrame; |
41 import static jdk.incubator.http.internal.frame.SettingsFrame.INITIAL_WINDOW_SIZE; |
42 import static jdk.incubator.http.internal.frame.SettingsFrame.INITIAL_WINDOW_SIZE; |
42 import static jdk.incubator.http.internal.frame.SettingsFrame.ENABLE_PUSH; |
43 import static jdk.incubator.http.internal.frame.SettingsFrame.ENABLE_PUSH; |
43 import static jdk.incubator.http.internal.frame.SettingsFrame.HEADER_TABLE_SIZE; |
44 import static jdk.incubator.http.internal.frame.SettingsFrame.HEADER_TABLE_SIZE; |
47 /** |
48 /** |
48 * Http2 specific aspects of HttpClientImpl |
49 * Http2 specific aspects of HttpClientImpl |
49 */ |
50 */ |
50 class Http2ClientImpl { |
51 class Http2ClientImpl { |
51 |
52 |
|
53 static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. |
|
54 final static System.Logger debug = |
|
55 Utils.getDebugLogger("Http2ClientImpl"::toString, DEBUG); |
|
56 |
52 private final HttpClientImpl client; |
57 private final HttpClientImpl client; |
53 |
58 |
54 Http2ClientImpl(HttpClientImpl client) { |
59 Http2ClientImpl(HttpClientImpl client) { |
55 this.client = client; |
60 this.client = client; |
56 } |
61 } |
57 |
62 |
58 /* Map key is "scheme:host:port" */ |
63 /* Map key is "scheme:host:port" */ |
59 private final Map<String,Http2Connection> connections = new ConcurrentHashMap<>(); |
64 private final Map<String,Http2Connection> connections = new ConcurrentHashMap<>(); |
60 |
65 |
61 private final Set<String> opening = Collections.synchronizedSet(new HashSet<>()); |
66 private final Set<String> opening = Collections.synchronizedSet(new HashSet<>()); |
|
67 private final Map<String,Set<CompletableFuture<Http2Connection>>> waiting = |
|
68 Collections.synchronizedMap(new HashMap<>()); |
|
69 |
|
70 private void addToWaiting(String key, CompletableFuture<Http2Connection> cf) { |
|
71 synchronized (waiting) { |
|
72 Set<CompletableFuture<Http2Connection>> waiters = waiting.get(key); |
|
73 if (waiters == null) { |
|
74 waiters = new HashSet<>(); |
|
75 waiting.put(key, waiters); |
|
76 } |
|
77 waiters.add(cf); |
|
78 } |
|
79 } |
62 |
80 |
63 boolean haveConnectionFor(URI uri, InetSocketAddress proxy) { |
81 boolean haveConnectionFor(URI uri, InetSocketAddress proxy) { |
64 return connections.containsKey(Http2Connection.keyFor(uri,proxy)); |
82 return connections.containsKey(Http2Connection.keyFor(uri,proxy)); |
65 } |
83 } |
66 |
84 |
67 /** |
85 /** |
68 * If a https request then blocks and waits until a connection is opened. |
86 * If a https request then async waits until a connection is opened. |
69 * Returns null if the request is 'http' as a different (upgrade) |
87 * Returns null if the request is 'http' as a different (upgrade) |
70 * mechanism is used. |
88 * mechanism is used. |
71 * |
89 * |
72 * Only one connection per destination is created. Blocks when opening |
90 * Only one connection per destination is created. Blocks when opening |
73 * connection, or when waiting for connection to be opened. |
91 * connection, or when waiting for connection to be opened. |
76 * If the request is secure (https) then we open the connection here. |
94 * If the request is secure (https) then we open the connection here. |
77 * If not, then the more complicated upgrade from 1.1 to 2 happens (not here) |
95 * If not, then the more complicated upgrade from 1.1 to 2 happens (not here) |
78 * In latter case, when the Http2Connection is connected, putConnection() must |
96 * In latter case, when the Http2Connection is connected, putConnection() must |
79 * be called to store it. |
97 * be called to store it. |
80 */ |
98 */ |
81 Http2Connection getConnectionFor(HttpRequestImpl req) |
99 CompletableFuture<Http2Connection> getConnectionFor(HttpRequestImpl req) { |
82 throws IOException, InterruptedException { |
|
83 URI uri = req.uri(); |
100 URI uri = req.uri(); |
84 InetSocketAddress proxy = req.proxy(client); |
101 InetSocketAddress proxy = req.proxy(client); |
85 String key = Http2Connection.keyFor(uri, proxy); |
102 String key = Http2Connection.keyFor(uri, proxy); |
86 Http2Connection connection = connections.get(key); |
103 |
87 if (connection != null) { // fast path if connection already exists |
104 synchronized (opening) { |
88 return connection; |
105 Http2Connection connection = connections.get(key); |
|
106 if (connection != null) { // fast path if connection already exists |
|
107 return CompletableFuture.completedFuture(connection); |
|
108 } |
|
109 |
|
110 if (!req.secure()) { |
|
111 return MinimalFuture.completedFuture(null); |
|
112 } |
|
113 |
|
114 if (!opening.contains(key)) { |
|
115 debug.log(Level.DEBUG, "Opening: %s", key); |
|
116 opening.add(key); |
|
117 } else { |
|
118 CompletableFuture<Http2Connection> cf = new MinimalFuture<>(); |
|
119 addToWaiting(key, cf); |
|
120 return cf; |
|
121 } |
89 } |
122 } |
90 synchronized (opening) { |
123 return Http2Connection |
91 while ((connection = connections.get(key)) == null) { |
124 .createAsync(req, this) |
92 if (!req.secure()) { |
125 .whenComplete((conn, t) -> { |
93 return null; |
126 debug.log(Level.DEBUG, |
94 } |
127 "waking up dependents with created connection"); |
95 if (!opening.contains(key)) { |
128 synchronized (opening) { |
96 opening.add(key); |
129 Set<CompletableFuture<Http2Connection>> waiters = waiting.remove(key); |
97 break; |
130 debug.log(Level.DEBUG, "Opening completed: %s", key); |
98 } else { |
131 opening.remove(key); |
99 opening.wait(); |
132 final Throwable cause = Utils.getCompletionCause(t); |
100 } |
133 if (waiters == null) { |
101 } |
134 debug.log(Level.DEBUG, "no dependent to wake up"); |
102 } |
135 return; |
103 if (connection != null) { |
136 } else if (cause instanceof Http2Connection.ALPNException) { |
104 return connection; |
137 waiters.forEach((cf1) -> cf1.completeAsync(() -> null, |
105 } |
138 client.theExecutor())); |
106 // we are opening the connection here blocking until it is done. |
139 } else if (cause != null) { |
107 try { |
140 debug.log(Level.DEBUG, |
108 connection = new Http2Connection(req, this); |
141 () -> "waking up dependants: failed: " + cause); |
109 } catch (Throwable t) { |
142 waiters.forEach((cf1) -> cf1.completeExceptionally(cause)); |
110 synchronized (opening) { |
143 } else { |
111 opening.remove(key); |
144 debug.log(Level.DEBUG, "waking up dependants: succeeded"); |
112 opening.notifyAll(); |
145 waiters.forEach((cf1) -> cf1.completeAsync(() -> conn, |
113 } |
146 client.theExecutor())); |
114 throw t; |
147 } |
115 } |
148 } |
116 synchronized (opening) { |
149 }); |
117 connections.put(key, connection); |
150 } |
118 opening.remove(key); |
|
119 opening.notifyAll(); |
|
120 } |
|
121 return connection; |
|
122 } |
|
123 |
|
124 |
151 |
125 /* |
152 /* |
126 * TODO: If there isn't a connection to the same destination, then |
153 * TODO: If there isn't a connection to the same destination, then |
127 * store it. If there is already a connection, then close it |
154 * store it. If there is already a connection, then close it |
128 */ |
155 */ |