author | dfuchs |
Mon, 23 Apr 2018 15:45:40 +0100 | |
branch | http-client-branch |
changeset 56474 | fe2bf7b369b8 |
parent 56451 | 9585061fdb04 |
child 56480 | 97bff67fed21 |
permissions | -rw-r--r-- |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1 |
/* |
48523
b95b08f3e1a8
8194883: Unhandleable Push Promises should be cancelled
chegar
parents:
48083
diff
changeset
|
2 |
* Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved. |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
3 |
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
4 |
* |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
5 |
* This code is free software; you can redistribute it and/or modify it |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
6 |
* under the terms of the GNU General Public License version 2 only, as |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
7 |
* published by the Free Software Foundation. Oracle designates this |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
8 |
* particular file as subject to the "Classpath" exception as provided |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
9 |
* by Oracle in the LICENSE file that accompanied this code. |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
10 |
* |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
11 |
* This code is distributed in the hope that it will be useful, but WITHOUT |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
12 |
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
13 |
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
14 |
* version 2 for more details (a copy is included in the LICENSE file that |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
15 |
* accompanied this code). |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
16 |
* |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
17 |
* You should have received a copy of the GNU General Public License version |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
18 |
* 2 along with this work; if not, write to the Free Software Foundation, |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
19 |
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
20 |
* |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
21 |
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
22 |
* or visit www.oracle.com if you need additional information or have any |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
23 |
* questions. |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
24 |
*/ |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
25 |
|
49765 | 26 |
package jdk.internal.net.http; |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
27 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
28 |
import java.io.IOException; |
49765 | 29 |
import java.io.UncheckedIOException; |
48083 | 30 |
import java.lang.System.Logger.Level; |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
31 |
import java.net.URI; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
32 |
import java.nio.ByteBuffer; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
33 |
import java.util.ArrayList; |
48083 | 34 |
import java.util.Collections; |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
35 |
import java.util.List; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
36 |
import java.util.concurrent.CompletableFuture; |
48083 | 37 |
import java.util.concurrent.ConcurrentLinkedDeque; |
38 |
import java.util.concurrent.ConcurrentLinkedQueue; |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
39 |
import java.util.concurrent.Executor; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
40 |
import java.util.concurrent.Flow; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
41 |
import java.util.concurrent.Flow.Subscription; |
48083 | 42 |
import java.util.concurrent.atomic.AtomicReference; |
49765 | 43 |
import java.util.function.BiPredicate; |
44 |
import java.net.http.HttpClient; |
|
45 |
import java.net.http.HttpHeaders; |
|
46 |
import java.net.http.HttpRequest; |
|
47 |
import java.net.http.HttpResponse; |
|
48 |
import java.net.http.HttpResponse.BodySubscriber; |
|
49 |
import jdk.internal.net.http.common.*; |
|
50 |
import jdk.internal.net.http.frame.*; |
|
51 |
import jdk.internal.net.http.hpack.DecodingCallback; |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
52 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
53 |
/** |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
54 |
* Http/2 Stream handling. |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
55 |
* |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
56 |
* REQUESTS |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
57 |
* |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
58 |
* sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
59 |
* |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
60 |
* sendRequest() -- sendHeadersOnly() + sendBody() |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
61 |
* |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
62 |
* sendBodyAsync() -- calls sendBody() in an executor thread. |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
63 |
* |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
64 |
* sendHeadersAsync() -- calls sendHeadersOnly() which does not block |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
65 |
* |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
66 |
* sendRequestAsync() -- calls sendRequest() in an executor thread |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
67 |
* |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
68 |
* RESPONSES |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
69 |
* |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
70 |
* Multiple responses can be received per request. Responses are queued up on |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
71 |
* a LinkedList of CF<HttpResponse> and the the first one on the list is completed |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
72 |
* with the next response |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
73 |
* |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
74 |
* getResponseAsync() -- queries list of response CFs and returns first one |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
75 |
* if one exists. Otherwise, creates one and adds it to list |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
76 |
* and returns it. Completion is achieved through the |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
77 |
* incoming() upcall from connection reader thread. |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
78 |
* |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
79 |
* getResponse() -- calls getResponseAsync() and waits for CF to complete |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
80 |
* |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
81 |
* responseBodyAsync() -- calls responseBody() in an executor thread. |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
82 |
* |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
83 |
* incoming() -- entry point called from connection reader thread. Frames are |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
84 |
* either handled immediately without blocking or for data frames |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
85 |
* placed on the stream's inputQ which is consumed by the stream's |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
86 |
* reader thread. |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
87 |
* |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
88 |
* PushedStream sub class |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
89 |
* ====================== |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
90 |
* Sending side methods are not used because the request comes from a PUSH_PROMISE |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
91 |
* frame sent by the server. When a PUSH_PROMISE is received the PushedStream |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
92 |
* is created. PushedStream does not use responseCF list as there can be only |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
93 |
* one response. The CF is created when the object created and when the response |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
94 |
* HEADERS frame is received the object is completed. |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
95 |
*/ |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
96 |
class Stream<T> extends ExchangeImpl<T> { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
97 |
|
49765 | 98 |
final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); |
48083 | 99 |
|
100 |
final ConcurrentLinkedQueue<Http2Frame> inputQ = new ConcurrentLinkedQueue<>(); |
|
101 |
final SequentialScheduler sched = |
|
102 |
SequentialScheduler.synchronizedScheduler(this::schedule); |
|
49765 | 103 |
final SubscriptionBase userSubscription = |
104 |
new SubscriptionBase(sched, this::cancel, this::onSubscriptionError); |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
105 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
106 |
/** |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
107 |
* This stream's identifier. Assigned lazily by the HTTP2Connection before |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
108 |
* the stream's first frame is sent. |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
109 |
*/ |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
110 |
protected volatile int streamid; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
111 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
112 |
long requestContentLen; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
113 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
114 |
final Http2Connection connection; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
115 |
final HttpRequestImpl request; |
49765 | 116 |
final HeadersConsumer rspHeadersConsumer; |
117 |
final HttpHeadersImpl responseHeaders; |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
118 |
final HttpHeadersImpl requestPseudoHeaders; |
48083 | 119 |
volatile HttpResponse.BodySubscriber<T> responseSubscriber; |
120 |
final HttpRequest.BodyPublisher requestPublisher; |
|
121 |
volatile RequestSubscriber requestSubscriber; |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
122 |
volatile int responseCode; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
123 |
volatile Response response; |
49765 | 124 |
// The exception with which this stream was canceled. |
125 |
private final AtomicReference<Throwable> errorRef = new AtomicReference<>(); |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
126 |
final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>(); |
48083 | 127 |
volatile CompletableFuture<T> responseBodyCF; |
49765 | 128 |
volatile HttpResponse.BodySubscriber<T> pendingResponseSubscriber; |
129 |
volatile boolean stopRequested; |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
130 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
131 |
/** True if END_STREAM has been seen in a frame received on this stream. */ |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
132 |
private volatile boolean remotelyClosed; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
133 |
private volatile boolean closed; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
134 |
private volatile boolean endStreamSent; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
135 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
136 |
// state flags |
48083 | 137 |
private boolean requestSent, responseReceived; |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
138 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
139 |
/** |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
140 |
* A reference to this Stream's connection Send Window controller. The |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
141 |
* stream MUST acquire the appropriate amount of Send Window before |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
142 |
* sending any data. Will be null for PushStreams, as they cannot send data. |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
143 |
*/ |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
144 |
private final WindowController windowController; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
145 |
private final WindowUpdateSender windowUpdater; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
146 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
147 |
@Override |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
148 |
HttpConnection connection() { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
149 |
return connection.connection; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
150 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
151 |
|
48083 | 152 |
/** |
153 |
* Invoked either from incoming() -> {receiveDataFrame() or receiveResetFrame() } |
|
154 |
* of after user subscription window has re-opened, from SubscriptionBase.request() |
|
155 |
*/ |
|
156 |
private void schedule() { |
|
49765 | 157 |
boolean onCompleteCalled = false; |
158 |
HttpResponse.BodySubscriber<T> subscriber = responseSubscriber; |
|
159 |
try { |
|
160 |
if (subscriber == null) { |
|
161 |
subscriber = responseSubscriber = pendingResponseSubscriber; |
|
162 |
if (subscriber == null) { |
|
163 |
// can't process anything yet |
|
164 |
return; |
|
165 |
} else { |
|
166 |
if (debug.on()) debug.log("subscribing user subscriber"); |
|
167 |
subscriber.onSubscribe(userSubscription); |
|
168 |
} |
|
48083 | 169 |
} |
49765 | 170 |
while (!inputQ.isEmpty()) { |
171 |
Http2Frame frame = inputQ.peek(); |
|
172 |
if (frame instanceof ResetFrame) { |
|
173 |
inputQ.remove(); |
|
174 |
handleReset((ResetFrame)frame); |
|
175 |
return; |
|
176 |
} |
|
177 |
DataFrame df = (DataFrame)frame; |
|
178 |
boolean finished = df.getFlag(DataFrame.END_STREAM); |
|
48083 | 179 |
|
49765 | 180 |
List<ByteBuffer> buffers = df.getData(); |
181 |
List<ByteBuffer> dsts = Collections.unmodifiableList(buffers); |
|
182 |
int size = Utils.remaining(dsts, Integer.MAX_VALUE); |
|
183 |
if (size == 0 && finished) { |
|
184 |
inputQ.remove(); |
|
48083 | 185 |
Log.logTrace("responseSubscriber.onComplete"); |
49765 | 186 |
if (debug.on()) debug.log("incoming: onComplete"); |
48083 | 187 |
sched.stop(); |
49765 | 188 |
subscriber.onComplete(); |
189 |
onCompleteCalled = true; |
|
48083 | 190 |
setEndStreamReceived(); |
191 |
return; |
|
49765 | 192 |
} else if (userSubscription.tryDecrement()) { |
193 |
inputQ.remove(); |
|
194 |
Log.logTrace("responseSubscriber.onNext {0}", size); |
|
195 |
if (debug.on()) debug.log("incoming: onNext(%d)", size); |
|
196 |
subscriber.onNext(dsts); |
|
197 |
if (consumed(df)) { |
|
198 |
Log.logTrace("responseSubscriber.onComplete"); |
|
199 |
if (debug.on()) debug.log("incoming: onComplete"); |
|
200 |
sched.stop(); |
|
201 |
subscriber.onComplete(); |
|
202 |
onCompleteCalled = true; |
|
203 |
setEndStreamReceived(); |
|
204 |
return; |
|
205 |
} |
|
206 |
} else { |
|
207 |
if (stopRequested) break; |
|
208 |
return; |
|
48083 | 209 |
} |
210 |
} |
|
49765 | 211 |
} catch (Throwable throwable) { |
212 |
errorRef.compareAndSet(null, throwable); |
|
48083 | 213 |
} |
49765 | 214 |
|
215 |
Throwable t = errorRef.get(); |
|
48083 | 216 |
if (t != null) { |
217 |
sched.stop(); |
|
49765 | 218 |
try { |
219 |
if (!onCompleteCalled) { |
|
220 |
if (debug.on()) |
|
221 |
debug.log("calling subscriber.onError: %s", (Object)t); |
|
222 |
subscriber.onError(t); |
|
223 |
} else { |
|
224 |
if (debug.on()) |
|
225 |
debug.log("already completed: dropping error %s", (Object)t); |
|
226 |
} |
|
227 |
} catch (Throwable x) { |
|
228 |
Log.logError("Subscriber::onError threw exception: {0}", (Object)t); |
|
229 |
} finally { |
|
230 |
cancelImpl(t); |
|
231 |
} |
|
48083 | 232 |
} |
233 |
} |
|
234 |
||
235 |
// Callback invoked after the Response BodySubscriber has consumed the |
|
236 |
// buffers contained in a DataFrame. |
|
237 |
// Returns true if END_STREAM is reached, false otherwise. |
|
238 |
private boolean consumed(DataFrame df) { |
|
239 |
// RFC 7540 6.1: |
|
240 |
// The entire DATA frame payload is included in flow control, |
|
241 |
// including the Pad Length and Padding fields if present |
|
242 |
int len = df.payloadLength(); |
|
243 |
connection.windowUpdater.update(len); |
|
244 |
||
245 |
if (!df.getFlag(DataFrame.END_STREAM)) { |
|
246 |
// Don't send window update on a stream which is |
|
247 |
// closed or half closed. |
|
248 |
windowUpdater.update(len); |
|
249 |
return false; // more data coming |
|
250 |
} |
|
251 |
return true; // end of stream |
|
252 |
} |
|
253 |
||
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
254 |
@Override |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
255 |
CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler, |
43999
4cc44dd9f14f
8164625: Pooled HttpConnection should be removed during close
prappo
parents:
43984
diff
changeset
|
256 |
boolean returnConnectionToPool, |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
257 |
Executor executor) |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
258 |
{ |
49765 | 259 |
try { |
260 |
Log.logTrace("Reading body on stream {0}", streamid); |
|
261 |
BodySubscriber<T> bodySubscriber = handler.apply(new ResponseInfoImpl(response)); |
|
262 |
CompletableFuture<T> cf = receiveData(bodySubscriber, executor); |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
263 |
|
49765 | 264 |
PushGroup<?> pg = exchange.getPushGroup(); |
265 |
if (pg != null) { |
|
266 |
// if an error occurs make sure it is recorded in the PushGroup |
|
267 |
cf = cf.whenComplete((t, e) -> pg.pushError(e)); |
|
268 |
} |
|
269 |
return cf; |
|
270 |
} catch (Throwable t) { |
|
271 |
// may be thrown by handler.apply |
|
272 |
cancelImpl(t); |
|
273 |
return MinimalFuture.failedFuture(t); |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
274 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
275 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
276 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
277 |
@Override |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
278 |
public String toString() { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
279 |
StringBuilder sb = new StringBuilder(); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
280 |
sb.append("streamid: ") |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
281 |
.append(streamid); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
282 |
return sb.toString(); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
283 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
284 |
|
48083 | 285 |
private void receiveDataFrame(DataFrame df) { |
286 |
inputQ.add(df); |
|
287 |
sched.runOrSchedule(); |
|
288 |
} |
|
289 |
||
290 |
/** Handles a RESET frame. RESET is always handled inline in the queue. */ |
|
291 |
private void receiveResetFrame(ResetFrame frame) { |
|
292 |
inputQ.add(frame); |
|
293 |
sched.runOrSchedule(); |
|
43984 | 294 |
} |
295 |
||
48083 | 296 |
// pushes entire response body into response subscriber |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
297 |
// blocking when required by local or remote flow control |
49765 | 298 |
CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber, Executor executor) { |
299 |
responseBodyCF = new MinimalFuture<>(); |
|
300 |
// We want to allow the subscriber's getBody() method to block so it |
|
301 |
// can work with InputStreams. So, we offload execution. |
|
302 |
executor.execute(() -> { |
|
303 |
try { |
|
304 |
bodySubscriber.getBody().whenComplete((T body, Throwable t) -> { |
|
305 |
if (t == null) |
|
306 |
responseBodyCF.complete(body); |
|
307 |
else |
|
308 |
responseBodyCF.completeExceptionally(t); |
|
309 |
}); |
|
310 |
} catch(Throwable t) { |
|
311 |
cancelImpl(t); |
|
312 |
} |
|
313 |
}); |
|
48083 | 314 |
|
315 |
if (isCanceled()) { |
|
316 |
Throwable t = getCancelCause(); |
|
317 |
responseBodyCF.completeExceptionally(t); |
|
43984 | 318 |
} else { |
49765 | 319 |
pendingResponseSubscriber = bodySubscriber; |
320 |
sched.runOrSchedule(); // in case data waiting already to be processed |
|
43984 | 321 |
} |
48083 | 322 |
return responseBodyCF; |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
323 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
324 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
325 |
@Override |
43984 | 326 |
CompletableFuture<ExchangeImpl<T>> sendBodyAsync() { |
327 |
return sendBodyImpl().thenApply( v -> this); |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
328 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
329 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
330 |
@SuppressWarnings("unchecked") |
48083 | 331 |
Stream(Http2Connection connection, |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
332 |
Exchange<T> e, |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
333 |
WindowController windowController) |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
334 |
{ |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
335 |
super(e); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
336 |
this.connection = connection; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
337 |
this.windowController = windowController; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
338 |
this.request = e.request(); |
48083 | 339 |
this.requestPublisher = request.requestPublisher; // may be null |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
340 |
responseHeaders = new HttpHeadersImpl(); |
49765 | 341 |
rspHeadersConsumer = new HeadersConsumer(); |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
342 |
this.requestPseudoHeaders = new HttpHeadersImpl(); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
343 |
// NEW |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
344 |
this.windowUpdater = new StreamWindowUpdateSender(connection); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
345 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
346 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
347 |
/** |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
348 |
* Entry point from Http2Connection reader thread. |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
349 |
* |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
350 |
* Data frames will be removed by response body thread. |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
351 |
*/ |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
352 |
void incoming(Http2Frame frame) throws IOException { |
49765 | 353 |
if (debug.on()) debug.log("incoming: %s", frame); |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
354 |
if ((frame instanceof HeaderFrame)) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
355 |
HeaderFrame hframe = (HeaderFrame)frame; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
356 |
if (hframe.endHeaders()) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
357 |
Log.logTrace("handling response (streamid={0})", streamid); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
358 |
handleResponse(); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
359 |
if (hframe.getFlag(HeaderFrame.END_STREAM)) { |
48083 | 360 |
receiveDataFrame(new DataFrame(streamid, DataFrame.END_STREAM, List.of())); |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
361 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
362 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
363 |
} else if (frame instanceof DataFrame) { |
48083 | 364 |
receiveDataFrame((DataFrame)frame); |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
365 |
} else { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
366 |
otherFrame(frame); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
367 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
368 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
369 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
370 |
void otherFrame(Http2Frame frame) throws IOException { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
371 |
switch (frame.type()) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
372 |
case WindowUpdateFrame.TYPE: |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
373 |
incoming_windowUpdate((WindowUpdateFrame) frame); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
374 |
break; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
375 |
case ResetFrame.TYPE: |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
376 |
incoming_reset((ResetFrame) frame); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
377 |
break; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
378 |
case PriorityFrame.TYPE: |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
379 |
incoming_priority((PriorityFrame) frame); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
380 |
break; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
381 |
default: |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
382 |
String msg = "Unexpected frame: " + frame.toString(); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
383 |
throw new IOException(msg); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
384 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
385 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
386 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
387 |
// The Hpack decoder decodes into one of these consumers of name,value pairs |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
388 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
389 |
DecodingCallback rspHeadersConsumer() { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
390 |
return rspHeadersConsumer; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
391 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
392 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
393 |
protected void handleResponse() throws IOException { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
394 |
responseCode = (int)responseHeaders |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
395 |
.firstValueAsLong(":status") |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
396 |
.orElseThrow(() -> new IOException("no statuscode in response")); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
397 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
398 |
response = new Response( |
49765 | 399 |
request, exchange, responseHeaders, connection(), |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
400 |
responseCode, HttpClient.Version.HTTP_2); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
401 |
|
48083 | 402 |
/* TODO: review if needs to be removed |
403 |
the value is not used, but in case `content-length` doesn't parse as |
|
404 |
long, there will be NumberFormatException. If left as is, make sure |
|
405 |
code up the stack handles NFE correctly. */ |
|
406 |
responseHeaders.firstValueAsLong("content-length"); |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
407 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
408 |
if (Log.headers()) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
409 |
StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n"); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
410 |
Log.dumpHeaders(sb, " ", responseHeaders); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
411 |
Log.logHeaders(sb.toString()); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
412 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
413 |
|
49765 | 414 |
// this will clear the response headers |
415 |
rspHeadersConsumer.reset(); |
|
416 |
||
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
417 |
completeResponse(response); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
418 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
419 |
|
48083 | 420 |
void incoming_reset(ResetFrame frame) { |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
421 |
Log.logTrace("Received RST_STREAM on stream {0}", streamid); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
422 |
if (endStreamReceived()) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
423 |
Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
424 |
} else if (closed) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
425 |
Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
426 |
} else { |
48083 | 427 |
// put it in the input queue in order to read all |
428 |
// pending data frames first. Indeed, a server may send |
|
429 |
// RST_STREAM after sending END_STREAM, in which case we should |
|
430 |
// ignore it. However, we won't know if we have received END_STREAM |
|
431 |
// or not until all pending data frames are read. |
|
432 |
receiveResetFrame(frame); |
|
433 |
// RST_STREAM was pushed to the queue. It will be handled by |
|
434 |
// asyncReceive after all pending data frames have been |
|
435 |
// processed. |
|
436 |
Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid); |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
437 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
438 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
439 |
|
48083 | 440 |
void handleReset(ResetFrame frame) { |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
441 |
Log.logTrace("Handling RST_STREAM on stream {0}", streamid); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
442 |
if (!closed) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
443 |
close(); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
444 |
int error = frame.getErrorCode(); |
48083 | 445 |
completeResponseExceptionally(new IOException(ErrorFrame.stringForCode(error))); |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
446 |
} else { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
447 |
Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
448 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
449 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
450 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
451 |
void incoming_priority(PriorityFrame frame) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
452 |
// TODO: implement priority |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
453 |
throw new UnsupportedOperationException("Not implemented"); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
454 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
455 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
456 |
private void incoming_windowUpdate(WindowUpdateFrame frame) |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
457 |
throws IOException |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
458 |
{ |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
459 |
int amount = frame.getUpdate(); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
460 |
if (amount <= 0) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
461 |
Log.logTrace("Resetting stream: {0} %d, Window Update amount: %d\n", |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
462 |
streamid, streamid, amount); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
463 |
connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
464 |
} else { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
465 |
assert streamid != 0; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
466 |
boolean success = windowController.increaseStreamWindow(amount, streamid); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
467 |
if (!success) { // overflow |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
468 |
connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
469 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
470 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
471 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
472 |
|
49765 | 473 |
void incoming_pushPromise(HttpRequestImpl pushRequest, |
474 |
PushedStream<T> pushStream) |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
475 |
throws IOException |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
476 |
{ |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
477 |
if (Log.requests()) { |
49765 | 478 |
Log.logRequest("PUSH_PROMISE: " + pushRequest.toString()); |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
479 |
} |
49765 | 480 |
PushGroup<T> pushGroup = exchange.getPushGroup(); |
48523
b95b08f3e1a8
8194883: Unhandleable Push Promises should be cancelled
chegar
parents:
48083
diff
changeset
|
481 |
if (pushGroup == null) { |
b95b08f3e1a8
8194883: Unhandleable Push Promises should be cancelled
chegar
parents:
48083
diff
changeset
|
482 |
Log.logTrace("Rejecting push promise stream " + streamid); |
b95b08f3e1a8
8194883: Unhandleable Push Promises should be cancelled
chegar
parents:
48083
diff
changeset
|
483 |
connection.resetStream(pushStream.streamid, ResetFrame.REFUSED_STREAM); |
b95b08f3e1a8
8194883: Unhandleable Push Promises should be cancelled
chegar
parents:
48083
diff
changeset
|
484 |
pushStream.close(); |
48083 | 485 |
return; |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
486 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
487 |
|
49765 | 488 |
PushGroup.Acceptor<T> acceptor = null; |
489 |
boolean accepted = false; |
|
490 |
try { |
|
491 |
acceptor = pushGroup.acceptPushRequest(pushRequest); |
|
492 |
accepted = acceptor.accepted(); |
|
493 |
} catch (Throwable t) { |
|
494 |
if (debug.on()) |
|
495 |
debug.log("PushPromiseHandler::applyPushPromise threw exception %s", |
|
496 |
(Object)t); |
|
497 |
} |
|
498 |
if (!accepted) { |
|
499 |
// cancel / reject |
|
500 |
IOException ex = new IOException("Stream " + streamid + " cancelled by users handler"); |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
501 |
if (Log.trace()) { |
49765 | 502 |
Log.logTrace("No body subscriber for {0}: {1}", pushRequest, |
503 |
ex.getMessage()); |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
504 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
505 |
pushStream.cancelImpl(ex); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
506 |
return; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
507 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
508 |
|
49765 | 509 |
assert accepted && acceptor != null; |
510 |
CompletableFuture<HttpResponse<T>> pushResponseCF = acceptor.cf(); |
|
511 |
HttpResponse.BodyHandler<T> pushHandler = acceptor.bodyHandler(); |
|
512 |
assert pushHandler != null; |
|
513 |
||
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
514 |
pushStream.requestSent(); |
49765 | 515 |
pushStream.setPushHandler(pushHandler); // TODO: could wrap the handler to throw on acceptPushPromise ? |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
516 |
// setup housekeeping for when the push is received |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
517 |
// TODO: deal with ignoring of CF anti-pattern |
49765 | 518 |
CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF(); |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
519 |
cf.whenComplete((HttpResponse<T> resp, Throwable t) -> { |
48083 | 520 |
t = Utils.getCompletionCause(t); |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
521 |
if (Log.trace()) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
522 |
Log.logTrace("Push completed on stream {0} for {1}{2}", |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
523 |
pushStream.streamid, resp, |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
524 |
((t==null) ? "": " with exception " + t)); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
525 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
526 |
if (t != null) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
527 |
pushGroup.pushError(t); |
49765 | 528 |
pushResponseCF.completeExceptionally(t); |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
529 |
} else { |
49765 | 530 |
pushResponseCF.complete(resp); |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
531 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
532 |
pushGroup.pushCompleted(); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
533 |
}); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
534 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
535 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
536 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
537 |
private OutgoingHeaders<Stream<T>> headerFrame(long contentLength) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
538 |
HttpHeadersImpl h = request.getSystemHeaders(); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
539 |
if (contentLength > 0) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
540 |
h.setHeader("content-length", Long.toString(contentLength)); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
541 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
542 |
setPseudoHeaderFields(); |
49765 | 543 |
HttpHeaders sysh = filter(h); |
544 |
HttpHeaders userh = filter(request.getUserHeaders()); |
|
545 |
OutgoingHeaders<Stream<T>> f = new OutgoingHeaders<>(sysh, userh, this); |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
546 |
if (contentLength == 0) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
547 |
f.setFlag(HeadersFrame.END_STREAM); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
548 |
endStreamSent = true; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
549 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
550 |
return f; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
551 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
552 |
|
49765 | 553 |
private boolean hasProxyAuthorization(HttpHeaders headers) { |
554 |
return headers.firstValue("proxy-authorization") |
|
555 |
.isPresent(); |
|
556 |
} |
|
557 |
||
558 |
// Determines whether we need to build a new HttpHeader object. |
|
559 |
// |
|
560 |
// Ideally we should pass the filter to OutgoingHeaders refactor the |
|
561 |
// code that creates the HeaderFrame to honor the filter. |
|
562 |
// We're not there yet - so depending on the filter we need to |
|
563 |
// apply and the content of the header we will try to determine |
|
564 |
// whether anything might need to be filtered. |
|
565 |
// If nothing needs filtering then we can just use the |
|
566 |
// original headers. |
|
567 |
private boolean needsFiltering(HttpHeaders headers, |
|
568 |
BiPredicate<String, List<String>> filter) { |
|
569 |
if (filter == Utils.PROXY_TUNNEL_FILTER || filter == Utils.PROXY_FILTER) { |
|
570 |
// we're either connecting or proxying |
|
571 |
// slight optimization: we only need to filter out |
|
572 |
// disabled schemes, so if there are none just |
|
573 |
// pass through. |
|
574 |
return Utils.proxyHasDisabledSchemes(filter == Utils.PROXY_TUNNEL_FILTER) |
|
575 |
&& hasProxyAuthorization(headers); |
|
576 |
} else { |
|
577 |
// we're talking to a server, either directly or through |
|
578 |
// a tunnel. |
|
579 |
// Slight optimization: we only need to filter out |
|
580 |
// proxy authorization headers, so if there are none just |
|
581 |
// pass through. |
|
582 |
return hasProxyAuthorization(headers); |
|
583 |
} |
|
584 |
} |
|
585 |
||
586 |
private HttpHeaders filter(HttpHeaders headers) { |
|
587 |
HttpConnection conn = connection(); |
|
588 |
BiPredicate<String, List<String>> filter = |
|
589 |
conn.headerFilter(request); |
|
590 |
if (needsFiltering(headers, filter)) { |
|
591 |
return ImmutableHeaders.of(headers.map(), filter); |
|
592 |
} |
|
593 |
return headers; |
|
594 |
} |
|
595 |
||
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
596 |
private void setPseudoHeaderFields() { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
597 |
HttpHeadersImpl hdrs = requestPseudoHeaders; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
598 |
String method = request.method(); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
599 |
hdrs.setHeader(":method", method); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
600 |
URI uri = request.uri(); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
601 |
hdrs.setHeader(":scheme", uri.getScheme()); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
602 |
// TODO: userinfo deprecated. Needs to be removed |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
603 |
hdrs.setHeader(":authority", uri.getAuthority()); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
604 |
// TODO: ensure header names beginning with : not in user headers |
49765 | 605 |
String query = uri.getRawQuery(); |
606 |
String path = uri.getRawPath(); |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
607 |
if (path == null || path.isEmpty()) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
608 |
if (method.equalsIgnoreCase("OPTIONS")) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
609 |
path = "*"; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
610 |
} else { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
611 |
path = "/"; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
612 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
613 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
614 |
if (query != null) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
615 |
path += "?" + query; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
616 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
617 |
hdrs.setHeader(":path", path); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
618 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
619 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
620 |
HttpHeadersImpl getRequestPseudoHeaders() { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
621 |
return requestPseudoHeaders; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
622 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
623 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
624 |
/** Sets endStreamReceived. Should be called only once. */ |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
625 |
void setEndStreamReceived() { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
626 |
assert remotelyClosed == false: "Unexpected endStream already set"; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
627 |
remotelyClosed = true; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
628 |
responseReceived(); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
629 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
630 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
631 |
/** Tells whether, or not, the END_STREAM Flag has been seen in any frame |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
632 |
* received on this stream. */ |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
633 |
private boolean endStreamReceived() { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
634 |
return remotelyClosed; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
635 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
636 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
637 |
@Override |
48083 | 638 |
CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() { |
49765 | 639 |
if (debug.on()) debug.log("sendHeadersOnly()"); |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
640 |
if (Log.requests() && request != null) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
641 |
Log.logRequest(request.toString()); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
642 |
} |
48083 | 643 |
if (requestPublisher != null) { |
644 |
requestContentLen = requestPublisher.contentLength(); |
|
645 |
} else { |
|
646 |
requestContentLen = 0; |
|
647 |
} |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
648 |
OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
649 |
connection.sendFrame(f); |
48083 | 650 |
CompletableFuture<ExchangeImpl<T>> cf = new MinimalFuture<>(); |
651 |
cf.complete(this); // #### good enough for now |
|
652 |
return cf; |
|
653 |
} |
|
654 |
||
655 |
@Override |
|
656 |
void released() { |
|
657 |
if (streamid > 0) { |
|
49765 | 658 |
if (debug.on()) debug.log("Released stream %d", streamid); |
48083 | 659 |
// remove this stream from the Http2Connection map. |
660 |
connection.closeStream(streamid); |
|
661 |
} else { |
|
49765 | 662 |
if (debug.on()) debug.log("Can't release stream %d", streamid); |
48083 | 663 |
} |
664 |
} |
|
665 |
||
666 |
@Override |
|
667 |
void completed() { |
|
668 |
// There should be nothing to do here: the stream should have |
|
669 |
// been already closed (or will be closed shortly after). |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
670 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
671 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
672 |
void registerStream(int id) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
673 |
this.streamid = id; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
674 |
connection.putStream(this, streamid); |
49765 | 675 |
if (debug.on()) debug.log("Registered stream %d", id); |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
676 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
677 |
|
48083 | 678 |
void signalWindowUpdate() { |
679 |
RequestSubscriber subscriber = requestSubscriber; |
|
680 |
assert subscriber != null; |
|
49765 | 681 |
if (debug.on()) debug.log("Signalling window update"); |
48083 | 682 |
subscriber.sendScheduler.runOrSchedule(); |
683 |
} |
|
684 |
||
685 |
static final ByteBuffer COMPLETED = ByteBuffer.allocate(0); |
|
45708
3512073b446f
8178699: Fail to send async requests if server doesn't response the first one
xiaofeya
parents:
45531
diff
changeset
|
686 |
class RequestSubscriber implements Flow.Subscriber<ByteBuffer> { |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
687 |
// can be < 0 if the actual length is not known. |
48083 | 688 |
private final long contentLength; |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
689 |
private volatile long remainingContentLength; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
690 |
private volatile Subscription subscription; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
691 |
|
48083 | 692 |
// Holds the outgoing data. There will be at most 2 outgoing ByteBuffers. |
693 |
// 1) The data that was published by the request body Publisher, and |
|
694 |
// 2) the COMPLETED sentinel, since onComplete can be invoked without demand. |
|
695 |
final ConcurrentLinkedDeque<ByteBuffer> outgoing = new ConcurrentLinkedDeque<>(); |
|
696 |
||
697 |
private final AtomicReference<Throwable> errorRef = new AtomicReference<>(); |
|
698 |
// A scheduler used to honor window updates. Writing must be paused |
|
699 |
// when the window is exhausted, and resumed when the window acquires |
|
700 |
// some space. The sendScheduler makes it possible to implement this |
|
701 |
// behaviour in an asynchronous non-blocking way. |
|
702 |
// See RequestSubscriber::trySend below. |
|
703 |
final SequentialScheduler sendScheduler; |
|
704 |
||
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
705 |
RequestSubscriber(long contentLen) { |
48083 | 706 |
this.contentLength = contentLen; |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
707 |
this.remainingContentLength = contentLen; |
48083 | 708 |
this.sendScheduler = |
709 |
SequentialScheduler.synchronizedScheduler(this::trySend); |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
710 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
711 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
712 |
@Override |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
713 |
public void onSubscribe(Flow.Subscription subscription) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
714 |
if (this.subscription != null) { |
48083 | 715 |
throw new IllegalStateException("already subscribed"); |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
716 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
717 |
this.subscription = subscription; |
49765 | 718 |
if (debug.on()) |
719 |
debug.log("RequestSubscriber: onSubscribe, request 1"); |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
720 |
subscription.request(1); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
721 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
722 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
723 |
@Override |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
724 |
public void onNext(ByteBuffer item) { |
49765 | 725 |
if (debug.on()) |
726 |
debug.log("RequestSubscriber: onNext(%d)", item.remaining()); |
|
48083 | 727 |
int size = outgoing.size(); |
728 |
assert size == 0 : "non-zero size: " + size; |
|
729 |
onNextImpl(item); |
|
730 |
} |
|
731 |
||
732 |
private void onNextImpl(ByteBuffer item) { |
|
733 |
// Got some more request body bytes to send. |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
734 |
if (requestBodyCF.isDone()) { |
48083 | 735 |
// stream already cancelled, probably in timeout |
736 |
sendScheduler.stop(); |
|
737 |
subscription.cancel(); |
|
738 |
return; |
|
739 |
} |
|
740 |
outgoing.add(item); |
|
741 |
sendScheduler.runOrSchedule(); |
|
742 |
} |
|
743 |
||
744 |
@Override |
|
745 |
public void onError(Throwable throwable) { |
|
49765 | 746 |
if (debug.on()) |
747 |
debug.log(() -> "RequestSubscriber: onError: " + throwable); |
|
48083 | 748 |
// ensure that errors are handled within the flow. |
749 |
if (errorRef.compareAndSet(null, throwable)) { |
|
750 |
sendScheduler.runOrSchedule(); |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
751 |
} |
48083 | 752 |
} |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
753 |
|
48083 | 754 |
@Override |
755 |
public void onComplete() { |
|
49765 | 756 |
if (debug.on()) debug.log("RequestSubscriber: onComplete"); |
48083 | 757 |
int size = outgoing.size(); |
758 |
assert size == 0 || size == 1 : "non-zero or one size: " + size; |
|
759 |
// last byte of request body has been obtained. |
|
760 |
// ensure that everything is completed within the flow. |
|
761 |
onNextImpl(COMPLETED); |
|
762 |
} |
|
763 |
||
764 |
// Attempts to send the data, if any. |
|
765 |
// Handles errors and completion state. |
|
766 |
// Pause writing if the send window is exhausted, resume it if the |
|
767 |
// send window has some bytes that can be acquired. |
|
768 |
void trySend() { |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
769 |
try { |
48083 | 770 |
// handle errors raised by onError; |
771 |
Throwable t = errorRef.get(); |
|
772 |
if (t != null) { |
|
773 |
sendScheduler.stop(); |
|
774 |
if (requestBodyCF.isDone()) return; |
|
775 |
subscription.cancel(); |
|
776 |
requestBodyCF.completeExceptionally(t); |
|
49765 | 777 |
cancelImpl(t); |
48083 | 778 |
return; |
779 |
} |
|
780 |
||
781 |
do { |
|
782 |
// handle COMPLETED; |
|
783 |
ByteBuffer item = outgoing.peekFirst(); |
|
784 |
if (item == null) return; |
|
785 |
else if (item == COMPLETED) { |
|
786 |
sendScheduler.stop(); |
|
787 |
complete(); |
|
788 |
return; |
|
789 |
} |
|
790 |
||
791 |
// handle bytes to send downstream |
|
792 |
while (item.hasRemaining()) { |
|
49765 | 793 |
if (debug.on()) debug.log("trySend: %d", item.remaining()); |
48083 | 794 |
assert !endStreamSent : "internal error, send data after END_STREAM flag"; |
795 |
DataFrame df = getDataFrame(item); |
|
796 |
if (df == null) { |
|
49765 | 797 |
if (debug.on()) |
798 |
debug.log("trySend: can't send yet: %d", item.remaining()); |
|
48083 | 799 |
return; // the send window is exhausted: come back later |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
800 |
} |
48083 | 801 |
|
802 |
if (contentLength > 0) { |
|
803 |
remainingContentLength -= df.getDataLength(); |
|
804 |
if (remainingContentLength < 0) { |
|
805 |
String msg = connection().getConnectionFlow() |
|
806 |
+ " stream=" + streamid + " " |
|
807 |
+ "[" + Thread.currentThread().getName() + "] " |
|
808 |
+ "Too many bytes in request body. Expected: " |
|
809 |
+ contentLength + ", got: " |
|
810 |
+ (contentLength - remainingContentLength); |
|
811 |
connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR); |
|
812 |
throw new IOException(msg); |
|
813 |
} else if (remainingContentLength == 0) { |
|
814 |
df.setFlag(DataFrame.END_STREAM); |
|
815 |
endStreamSent = true; |
|
816 |
} |
|
817 |
} |
|
49765 | 818 |
if (debug.on()) |
819 |
debug.log("trySend: sending: %d", df.getDataLength()); |
|
48083 | 820 |
connection.sendDataFrame(df); |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
821 |
} |
48083 | 822 |
assert !item.hasRemaining(); |
823 |
ByteBuffer b = outgoing.removeFirst(); |
|
824 |
assert b == item; |
|
825 |
} while (outgoing.peekFirst() != null); |
|
826 |
||
49765 | 827 |
if (debug.on()) debug.log("trySend: request 1"); |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
828 |
subscription.request(1); |
48083 | 829 |
} catch (Throwable ex) { |
49765 | 830 |
if (debug.on()) debug.log("trySend: ", ex); |
48083 | 831 |
sendScheduler.stop(); |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
832 |
subscription.cancel(); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
833 |
requestBodyCF.completeExceptionally(ex); |
49765 | 834 |
// need to cancel the stream to 1. tell the server |
835 |
// we don't want to receive any more data and |
|
836 |
// 2. ensure that the operation ref count will be |
|
837 |
// decremented on the HttpClient. |
|
838 |
cancelImpl(ex); |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
839 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
840 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
841 |
|
48083 | 842 |
private void complete() throws IOException { |
843 |
long remaining = remainingContentLength; |
|
844 |
long written = contentLength - remaining; |
|
845 |
if (remaining > 0) { |
|
846 |
connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR); |
|
847 |
// let trySend() handle the exception |
|
848 |
throw new IOException(connection().getConnectionFlow() |
|
849 |
+ " stream=" + streamid + " " |
|
850 |
+ "[" + Thread.currentThread().getName() +"] " |
|
851 |
+ "Too few bytes returned by the publisher (" |
|
852 |
+ written + "/" |
|
853 |
+ contentLength + ")"); |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
854 |
} |
48083 | 855 |
if (!endStreamSent) { |
856 |
endStreamSent = true; |
|
857 |
connection.sendDataFrame(getEmptyEndStreamDataFrame()); |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
858 |
} |
48083 | 859 |
requestBodyCF.complete(null); |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
860 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
861 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
862 |
|
48083 | 863 |
/** |
864 |
* Send a RESET frame to tell server to stop sending data on this stream |
|
865 |
*/ |
|
866 |
@Override |
|
867 |
public CompletableFuture<Void> ignoreBody() { |
|
868 |
try { |
|
869 |
connection.resetStream(streamid, ResetFrame.STREAM_CLOSED); |
|
870 |
return MinimalFuture.completedFuture(null); |
|
871 |
} catch (Throwable e) { |
|
872 |
Log.logTrace("Error resetting stream {0}", e.toString()); |
|
873 |
return MinimalFuture.failedFuture(e); |
|
874 |
} |
|
875 |
} |
|
876 |
||
877 |
DataFrame getDataFrame(ByteBuffer buffer) { |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
878 |
int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining()); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
879 |
// blocks waiting for stream send window, if exhausted |
48083 | 880 |
int actualAmount = windowController.tryAcquire(requestAmount, streamid, this); |
881 |
if (actualAmount <= 0) return null; |
|
48703
3eae36c6baa5
8195823: Buffers given to response body subscribers should not contain unprocessed HTTP data
chegar
parents:
48523
diff
changeset
|
882 |
ByteBuffer outBuf = Utils.sliceWithLimitedCapacity(buffer, actualAmount); |
48083 | 883 |
DataFrame df = new DataFrame(streamid, 0 , outBuf); |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
884 |
return df; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
885 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
886 |
|
48083 | 887 |
private DataFrame getEmptyEndStreamDataFrame() { |
888 |
return new DataFrame(streamid, DataFrame.END_STREAM, List.of()); |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
889 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
890 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
891 |
/** |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
892 |
* A List of responses relating to this stream. Normally there is only |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
893 |
* one response, but intermediate responses like 100 are allowed |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
894 |
* and must be passed up to higher level before continuing. Deals with races |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
895 |
* such as if responses are returned before the CFs get created by |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
896 |
* getResponseAsync() |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
897 |
*/ |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
898 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
899 |
final List<CompletableFuture<Response>> response_cfs = new ArrayList<>(5); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
900 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
901 |
@Override |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
902 |
CompletableFuture<Response> getResponseAsync(Executor executor) { |
48083 | 903 |
CompletableFuture<Response> cf; |
44639
5c2838d882a5
8178147: Race conditions in timeout handling code in http/2 incubator client
dfuchs
parents:
43999
diff
changeset
|
904 |
// The code below deals with race condition that can be caused when |
5c2838d882a5
8178147: Race conditions in timeout handling code in http/2 incubator client
dfuchs
parents:
43999
diff
changeset
|
905 |
// completeResponse() is being called before getResponseAsync() |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
906 |
synchronized (response_cfs) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
907 |
if (!response_cfs.isEmpty()) { |
44639
5c2838d882a5
8178147: Race conditions in timeout handling code in http/2 incubator client
dfuchs
parents:
43999
diff
changeset
|
908 |
// This CompletableFuture was created by completeResponse(). |
5c2838d882a5
8178147: Race conditions in timeout handling code in http/2 incubator client
dfuchs
parents:
43999
diff
changeset
|
909 |
// it will be already completed. |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
910 |
cf = response_cfs.remove(0); |
44639
5c2838d882a5
8178147: Race conditions in timeout handling code in http/2 incubator client
dfuchs
parents:
43999
diff
changeset
|
911 |
// if we find a cf here it should be already completed. |
5c2838d882a5
8178147: Race conditions in timeout handling code in http/2 incubator client
dfuchs
parents:
43999
diff
changeset
|
912 |
// finding a non completed cf should not happen. just assert it. |
5c2838d882a5
8178147: Race conditions in timeout handling code in http/2 incubator client
dfuchs
parents:
43999
diff
changeset
|
913 |
assert cf.isDone() : "Removing uncompleted response: could cause code to hang!"; |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
914 |
} else { |
44639
5c2838d882a5
8178147: Race conditions in timeout handling code in http/2 incubator client
dfuchs
parents:
43999
diff
changeset
|
915 |
// getResponseAsync() is called first. Create a CompletableFuture |
5c2838d882a5
8178147: Race conditions in timeout handling code in http/2 incubator client
dfuchs
parents:
43999
diff
changeset
|
916 |
// that will be completed by completeResponse() when |
5c2838d882a5
8178147: Race conditions in timeout handling code in http/2 incubator client
dfuchs
parents:
43999
diff
changeset
|
917 |
// completeResponse() is called. |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
918 |
cf = new MinimalFuture<>(); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
919 |
response_cfs.add(cf); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
920 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
921 |
} |
43984 | 922 |
if (executor != null && !cf.isDone()) { |
923 |
// protect from executing later chain of CompletableFuture operations from SelectorManager thread |
|
924 |
cf = cf.thenApplyAsync(r -> r, executor); |
|
925 |
} |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
926 |
Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf); |
49765 | 927 |
PushGroup<?> pg = exchange.getPushGroup(); |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
928 |
if (pg != null) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
929 |
// if an error occurs make sure it is recorded in the PushGroup |
48083 | 930 |
cf = cf.whenComplete((t,e) -> pg.pushError(Utils.getCompletionCause(e))); |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
931 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
932 |
return cf; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
933 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
934 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
935 |
/** |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
936 |
* Completes the first uncompleted CF on list, and removes it. If there is no |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
937 |
* uncompleted CF then creates one (completes it) and adds to list |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
938 |
*/ |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
939 |
void completeResponse(Response resp) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
940 |
synchronized (response_cfs) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
941 |
CompletableFuture<Response> cf; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
942 |
int cfs_len = response_cfs.size(); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
943 |
for (int i=0; i<cfs_len; i++) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
944 |
cf = response_cfs.get(i); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
945 |
if (!cf.isDone()) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
946 |
Log.logTrace("Completing response (streamid={0}): {1}", |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
947 |
streamid, cf); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
948 |
cf.complete(resp); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
949 |
response_cfs.remove(cf); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
950 |
return; |
44639
5c2838d882a5
8178147: Race conditions in timeout handling code in http/2 incubator client
dfuchs
parents:
43999
diff
changeset
|
951 |
} // else we found the previous response: just leave it alone. |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
952 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
953 |
cf = MinimalFuture.completedFuture(resp); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
954 |
Log.logTrace("Created completed future (streamid={0}): {1}", |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
955 |
streamid, cf); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
956 |
response_cfs.add(cf); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
957 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
958 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
959 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
960 |
// methods to update state and remove stream when finished |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
961 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
962 |
synchronized void requestSent() { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
963 |
requestSent = true; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
964 |
if (responseReceived) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
965 |
close(); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
966 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
967 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
968 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
969 |
synchronized void responseReceived() { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
970 |
responseReceived = true; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
971 |
if (requestSent) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
972 |
close(); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
973 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
974 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
975 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
976 |
/** |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
977 |
* same as above but for errors |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
978 |
*/ |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
979 |
void completeResponseExceptionally(Throwable t) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
980 |
synchronized (response_cfs) { |
44639
5c2838d882a5
8178147: Race conditions in timeout handling code in http/2 incubator client
dfuchs
parents:
43999
diff
changeset
|
981 |
// use index to avoid ConcurrentModificationException |
5c2838d882a5
8178147: Race conditions in timeout handling code in http/2 incubator client
dfuchs
parents:
43999
diff
changeset
|
982 |
// caused by removing the CF from within the loop. |
5c2838d882a5
8178147: Race conditions in timeout handling code in http/2 incubator client
dfuchs
parents:
43999
diff
changeset
|
983 |
for (int i = 0; i < response_cfs.size(); i++) { |
5c2838d882a5
8178147: Race conditions in timeout handling code in http/2 incubator client
dfuchs
parents:
43999
diff
changeset
|
984 |
CompletableFuture<Response> cf = response_cfs.get(i); |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
985 |
if (!cf.isDone()) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
986 |
cf.completeExceptionally(t); |
44639
5c2838d882a5
8178147: Race conditions in timeout handling code in http/2 incubator client
dfuchs
parents:
43999
diff
changeset
|
987 |
response_cfs.remove(i); |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
988 |
return; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
989 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
990 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
991 |
response_cfs.add(MinimalFuture.failedFuture(t)); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
992 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
993 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
994 |
|
43984 | 995 |
CompletableFuture<Void> sendBodyImpl() { |
48083 | 996 |
requestBodyCF.whenComplete((v, t) -> requestSent()); |
49765 | 997 |
try { |
998 |
if (requestPublisher != null) { |
|
999 |
final RequestSubscriber subscriber = new RequestSubscriber(requestContentLen); |
|
1000 |
requestPublisher.subscribe(requestSubscriber = subscriber); |
|
1001 |
} else { |
|
1002 |
// there is no request body, therefore the request is complete, |
|
1003 |
// END_STREAM has already sent with outgoing headers |
|
1004 |
requestBodyCF.complete(null); |
|
1005 |
} |
|
1006 |
} catch (Throwable t) { |
|
1007 |
cancelImpl(t); |
|
1008 |
requestBodyCF.completeExceptionally(t); |
|
48083 | 1009 |
} |
43984 | 1010 |
return requestBodyCF; |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1011 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1012 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1013 |
@Override |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1014 |
void cancel() { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1015 |
cancel(new IOException("Stream " + streamid + " cancelled")); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1016 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1017 |
|
49765 | 1018 |
void onSubscriptionError(Throwable t) { |
1019 |
errorRef.compareAndSet(null, t); |
|
1020 |
if (debug.on()) debug.log("Got subscription error: %s", (Object)t); |
|
1021 |
// This is the special case where the subscriber |
|
1022 |
// has requested an illegal number of items. |
|
1023 |
// In this case, the error doesn't come from |
|
1024 |
// upstream, but from downstream, and we need to |
|
1025 |
// handle the error without waiting for the inputQ |
|
1026 |
// to be exhausted. |
|
1027 |
stopRequested = true; |
|
1028 |
sched.runOrSchedule(); |
|
1029 |
} |
|
1030 |
||
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1031 |
@Override |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1032 |
void cancel(IOException cause) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1033 |
cancelImpl(cause); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1034 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1035 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1036 |
// This method sends a RST_STREAM frame |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1037 |
void cancelImpl(Throwable e) { |
49765 | 1038 |
errorRef.compareAndSet(null, e); |
1039 |
if (debug.on()) debug.log("cancelling stream {0}: {1}", streamid, e); |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1040 |
if (Log.trace()) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1041 |
Log.logTrace("cancelling stream {0}: {1}\n", streamid, e); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1042 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1043 |
boolean closing; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1044 |
if (closing = !closed) { // assigning closing to !closed |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1045 |
synchronized (this) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1046 |
if (closing = !closed) { // assigning closing to !closed |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1047 |
closed=true; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1048 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1049 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1050 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1051 |
if (closing) { // true if the stream has not been closed yet |
49765 | 1052 |
if (responseSubscriber != null || pendingResponseSubscriber != null) |
48083 | 1053 |
sched.runOrSchedule(); |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1054 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1055 |
completeResponseExceptionally(e); |
48083 | 1056 |
if (!requestBodyCF.isDone()) { |
49765 | 1057 |
requestBodyCF.completeExceptionally(errorRef.get()); // we may be sending the body.. |
48083 | 1058 |
} |
1059 |
if (responseBodyCF != null) { |
|
49765 | 1060 |
responseBodyCF.completeExceptionally(errorRef.get()); |
48083 | 1061 |
} |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1062 |
try { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1063 |
// will send a RST_STREAM frame |
45531
fb3dbffad37b
8180044: java/net/httpclient/ManyRequests.java failed due to timeout
dfuchs
parents:
44639
diff
changeset
|
1064 |
if (streamid != 0) { |
fb3dbffad37b
8180044: java/net/httpclient/ManyRequests.java failed due to timeout
dfuchs
parents:
44639
diff
changeset
|
1065 |
connection.resetStream(streamid, ResetFrame.CANCEL); |
fb3dbffad37b
8180044: java/net/httpclient/ManyRequests.java failed due to timeout
dfuchs
parents:
44639
diff
changeset
|
1066 |
} |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1067 |
} catch (IOException ex) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1068 |
Log.logError(ex); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1069 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1070 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1071 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1072 |
// This method doesn't send any frame |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1073 |
void close() { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1074 |
if (closed) return; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1075 |
synchronized(this) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1076 |
if (closed) return; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1077 |
closed = true; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1078 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1079 |
Log.logTrace("Closing stream {0}", streamid); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1080 |
connection.closeStream(streamid); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1081 |
Log.logTrace("Stream {0} closed", streamid); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1082 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1083 |
|
49765 | 1084 |
static class PushedStream<T> extends Stream<T> { |
1085 |
final PushGroup<T> pushGroup; |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1086 |
// push streams need the response CF allocated up front as it is |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1087 |
// given directly to user via the multi handler callback function. |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1088 |
final CompletableFuture<Response> pushCF; |
49765 | 1089 |
CompletableFuture<HttpResponse<T>> responseCF; |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1090 |
final HttpRequestImpl pushReq; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1091 |
HttpResponse.BodyHandler<T> pushHandler; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1092 |
|
49765 | 1093 |
PushedStream(PushGroup<T> pushGroup, |
48083 | 1094 |
Http2Connection connection, |
1095 |
Exchange<T> pushReq) { |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1096 |
// ## no request body possible, null window controller |
48083 | 1097 |
super(connection, pushReq, null); |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1098 |
this.pushGroup = pushGroup; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1099 |
this.pushReq = pushReq.request(); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1100 |
this.pushCF = new MinimalFuture<>(); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1101 |
this.responseCF = new MinimalFuture<>(); |
49765 | 1102 |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1103 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1104 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1105 |
CompletableFuture<HttpResponse<T>> responseCF() { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1106 |
return responseCF; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1107 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1108 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1109 |
synchronized void setPushHandler(HttpResponse.BodyHandler<T> pushHandler) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1110 |
this.pushHandler = pushHandler; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1111 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1112 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1113 |
synchronized HttpResponse.BodyHandler<T> getPushHandler() { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1114 |
// ignored parameters to function can be used as BodyHandler |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1115 |
return this.pushHandler; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1116 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1117 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1118 |
// Following methods call the super class but in case of |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1119 |
// error record it in the PushGroup. The error method is called |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1120 |
// with a null value when no error occurred (is a no-op) |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1121 |
@Override |
43984 | 1122 |
CompletableFuture<ExchangeImpl<T>> sendBodyAsync() { |
1123 |
return super.sendBodyAsync() |
|
48083 | 1124 |
.whenComplete((ExchangeImpl<T> v, Throwable t) |
1125 |
-> pushGroup.pushError(Utils.getCompletionCause(t))); |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1126 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1127 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1128 |
@Override |
43984 | 1129 |
CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() { |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1130 |
return super.sendHeadersAsync() |
48083 | 1131 |
.whenComplete((ExchangeImpl<T> ex, Throwable t) |
1132 |
-> pushGroup.pushError(Utils.getCompletionCause(t))); |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1133 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1134 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1135 |
@Override |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1136 |
CompletableFuture<Response> getResponseAsync(Executor executor) { |
48083 | 1137 |
CompletableFuture<Response> cf = pushCF.whenComplete( |
1138 |
(v, t) -> pushGroup.pushError(Utils.getCompletionCause(t))); |
|
43984 | 1139 |
if(executor!=null && !cf.isDone()) { |
1140 |
cf = cf.thenApplyAsync( r -> r, executor); |
|
1141 |
} |
|
1142 |
return cf; |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1143 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1144 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1145 |
@Override |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1146 |
CompletableFuture<T> readBodyAsync( |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1147 |
HttpResponse.BodyHandler<T> handler, |
43999
4cc44dd9f14f
8164625: Pooled HttpConnection should be removed during close
prappo
parents:
43984
diff
changeset
|
1148 |
boolean returnConnectionToPool, |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1149 |
Executor executor) |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1150 |
{ |
43999
4cc44dd9f14f
8164625: Pooled HttpConnection should be removed during close
prappo
parents:
43984
diff
changeset
|
1151 |
return super.readBodyAsync(handler, returnConnectionToPool, executor) |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1152 |
.whenComplete((v, t) -> pushGroup.pushError(t)); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1153 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1154 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1155 |
@Override |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1156 |
void completeResponse(Response r) { |
48083 | 1157 |
Log.logResponse(r::toString); |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1158 |
pushCF.complete(r); // not strictly required for push API |
48083 | 1159 |
// start reading the body using the obtained BodySubscriber |
43984 | 1160 |
CompletableFuture<Void> start = new MinimalFuture<>(); |
1161 |
start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor())) |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1162 |
.whenComplete((T body, Throwable t) -> { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1163 |
if (t != null) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1164 |
responseCF.completeExceptionally(t); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1165 |
} else { |
48083 | 1166 |
HttpResponseImpl<T> resp = |
1167 |
new HttpResponseImpl<>(r.request, r, null, body, getExchange()); |
|
1168 |
responseCF.complete(resp); |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1169 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1170 |
}); |
43984 | 1171 |
start.completeAsync(() -> null, getExchange().executor()); |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1172 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1173 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1174 |
@Override |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1175 |
void completeResponseExceptionally(Throwable t) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1176 |
pushCF.completeExceptionally(t); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1177 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1178 |
|
48083 | 1179 |
// @Override |
1180 |
// synchronized void responseReceived() { |
|
1181 |
// super.responseReceived(); |
|
1182 |
// } |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1183 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1184 |
// create and return the PushResponseImpl |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1185 |
@Override |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1186 |
protected void handleResponse() { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1187 |
responseCode = (int)responseHeaders |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1188 |
.firstValueAsLong(":status") |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1189 |
.orElse(-1); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1190 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1191 |
if (responseCode == -1) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1192 |
completeResponseExceptionally(new IOException("No status code")); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1193 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1194 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1195 |
this.response = new Response( |
49765 | 1196 |
pushReq, exchange, responseHeaders, connection(), |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1197 |
responseCode, HttpClient.Version.HTTP_2); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1198 |
|
48083 | 1199 |
/* TODO: review if needs to be removed |
1200 |
the value is not used, but in case `content-length` doesn't parse |
|
1201 |
as long, there will be NumberFormatException. If left as is, make |
|
1202 |
sure code up the stack handles NFE correctly. */ |
|
1203 |
responseHeaders.firstValueAsLong("content-length"); |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1204 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1205 |
if (Log.headers()) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1206 |
StringBuilder sb = new StringBuilder("RESPONSE HEADERS"); |
49765 | 1207 |
sb.append(" (streamid=").append(streamid).append("):\n"); |
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1208 |
Log.dumpHeaders(sb, " ", responseHeaders); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1209 |
Log.logHeaders(sb.toString()); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1210 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1211 |
|
49765 | 1212 |
rspHeadersConsumer.reset(); |
1213 |
||
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1214 |
// different implementations for normal streams and pushed streams |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1215 |
completeResponse(response); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1216 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1217 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1218 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1219 |
final class StreamWindowUpdateSender extends WindowUpdateSender { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1220 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1221 |
StreamWindowUpdateSender(Http2Connection connection) { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1222 |
super(connection); |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1223 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1224 |
|
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1225 |
@Override |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1226 |
int getStreamId() { |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1227 |
return streamid; |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1228 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1229 |
} |
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1230 |
|
48083 | 1231 |
/** |
1232 |
* Returns true if this exchange was canceled. |
|
1233 |
* @return true if this exchange was canceled. |
|
1234 |
*/ |
|
1235 |
synchronized boolean isCanceled() { |
|
49765 | 1236 |
return errorRef.get() != null; |
48083 | 1237 |
} |
1238 |
||
1239 |
/** |
|
1240 |
* Returns the cause for which this exchange was canceled, if available. |
|
1241 |
* @return the cause for which this exchange was canceled, if available. |
|
1242 |
*/ |
|
1243 |
synchronized Throwable getCancelCause() { |
|
49765 | 1244 |
return errorRef.get(); |
48083 | 1245 |
} |
1246 |
||
1247 |
final String dbgString() { |
|
1248 |
return connection.dbgString() + "/Stream("+streamid+")"; |
|
1249 |
} |
|
49765 | 1250 |
|
1251 |
private class HeadersConsumer extends Http2Connection.ValidatingHeadersConsumer { |
|
1252 |
||
1253 |
void reset() { |
|
1254 |
super.reset(); |
|
1255 |
responseHeaders.clear(); |
|
1256 |
} |
|
1257 |
||
1258 |
@Override |
|
1259 |
public void onDecoded(CharSequence name, CharSequence value) |
|
1260 |
throws UncheckedIOException |
|
1261 |
{ |
|
1262 |
String n = name.toString(); |
|
1263 |
String v = value.toString(); |
|
1264 |
super.onDecoded(n, v); |
|
1265 |
responseHeaders.addHeader(n, v); |
|
1266 |
if (Log.headers() && Log.trace()) { |
|
1267 |
Log.logTrace("RECEIVED HEADER (streamid={0}): {1}: {2}", |
|
1268 |
streamid, n, v); |
|
1269 |
} |
|
1270 |
} |
|
1271 |
} |
|
42460
7133f144981a
8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff
changeset
|
1272 |
} |