equal
deleted
inserted
replaced
37 import java.util.concurrent.ConcurrentLinkedDeque; |
37 import java.util.concurrent.ConcurrentLinkedDeque; |
38 import java.util.concurrent.ConcurrentLinkedQueue; |
38 import java.util.concurrent.ConcurrentLinkedQueue; |
39 import java.util.concurrent.Executor; |
39 import java.util.concurrent.Executor; |
40 import java.util.concurrent.Flow; |
40 import java.util.concurrent.Flow; |
41 import java.util.concurrent.Flow.Subscription; |
41 import java.util.concurrent.Flow.Subscription; |
|
42 import java.util.concurrent.atomic.AtomicBoolean; |
42 import java.util.concurrent.atomic.AtomicReference; |
43 import java.util.concurrent.atomic.AtomicReference; |
43 import java.util.function.BiPredicate; |
44 import java.util.function.BiPredicate; |
44 import java.net.http.HttpClient; |
45 import java.net.http.HttpClient; |
45 import java.net.http.HttpHeaders; |
46 import java.net.http.HttpHeaders; |
46 import java.net.http.HttpRequest; |
47 import java.net.http.HttpRequest; |
130 |
131 |
131 /** True if END_STREAM has been seen in a frame received on this stream. */ |
132 /** True if END_STREAM has been seen in a frame received on this stream. */ |
132 private volatile boolean remotelyClosed; |
133 private volatile boolean remotelyClosed; |
133 private volatile boolean closed; |
134 private volatile boolean closed; |
134 private volatile boolean endStreamSent; |
135 private volatile boolean endStreamSent; |
|
136 |
|
137 final AtomicBoolean deRegistered = new AtomicBoolean(false); |
135 |
138 |
136 // state flags |
139 // state flags |
137 private boolean requestSent, responseReceived; |
140 private boolean requestSent, responseReceived; |
138 |
141 |
139 /** |
142 /** |
183 if (size == 0 && finished) { |
186 if (size == 0 && finished) { |
184 inputQ.remove(); |
187 inputQ.remove(); |
185 Log.logTrace("responseSubscriber.onComplete"); |
188 Log.logTrace("responseSubscriber.onComplete"); |
186 if (debug.on()) debug.log("incoming: onComplete"); |
189 if (debug.on()) debug.log("incoming: onComplete"); |
187 sched.stop(); |
190 sched.stop(); |
|
191 connection.decrementStreamsCount(streamid); |
188 subscriber.onComplete(); |
192 subscriber.onComplete(); |
189 onCompleteCalled = true; |
193 onCompleteCalled = true; |
190 setEndStreamReceived(); |
194 setEndStreamReceived(); |
191 return; |
195 return; |
192 } else if (userSubscription.tryDecrement()) { |
196 } else if (userSubscription.tryDecrement()) { |
247 // closed or half closed. |
251 // closed or half closed. |
248 windowUpdater.update(len); |
252 windowUpdater.update(len); |
249 return false; // more data coming |
253 return false; // more data coming |
250 } |
254 } |
251 return true; // end of stream |
255 return true; // end of stream |
|
256 } |
|
257 |
|
258 boolean deRegister() { |
|
259 return deRegistered.compareAndSet(false, true); |
252 } |
260 } |
253 |
261 |
254 @Override |
262 @Override |
255 CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler, |
263 CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler, |
256 boolean returnConnectionToPool, |
264 boolean returnConnectionToPool, |
470 } |
478 } |
471 if (responseBodyCF != null) { |
479 if (responseBodyCF != null) { |
472 responseBodyCF.completeExceptionally(errorRef.get()); |
480 responseBodyCF.completeExceptionally(errorRef.get()); |
473 } |
481 } |
474 } finally { |
482 } finally { |
|
483 connection.decrementStreamsCount(streamid); |
475 connection.closeStream(streamid); |
484 connection.closeStream(streamid); |
476 } |
485 } |
477 } else { |
486 } else { |
478 Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid); |
487 Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid); |
479 } |
488 } |
686 @Override |
695 @Override |
687 void released() { |
696 void released() { |
688 if (streamid > 0) { |
697 if (streamid > 0) { |
689 if (debug.on()) debug.log("Released stream %d", streamid); |
698 if (debug.on()) debug.log("Released stream %d", streamid); |
690 // remove this stream from the Http2Connection map. |
699 // remove this stream from the Http2Connection map. |
|
700 connection.decrementStreamsCount(streamid); |
691 connection.closeStream(streamid); |
701 connection.closeStream(streamid); |
692 } else { |
702 } else { |
693 if (debug.on()) debug.log("Can't release stream %d", streamid); |
703 if (debug.on()) debug.log("Can't release stream %d", streamid); |
694 } |
704 } |
695 } |
705 } |
1100 responseBodyCF.completeExceptionally(errorRef.get()); |
1110 responseBodyCF.completeExceptionally(errorRef.get()); |
1101 } |
1111 } |
1102 try { |
1112 try { |
1103 // will send a RST_STREAM frame |
1113 // will send a RST_STREAM frame |
1104 if (streamid != 0) { |
1114 if (streamid != 0) { |
|
1115 connection.decrementStreamsCount(streamid); |
1105 e = Utils.getCompletionCause(e); |
1116 e = Utils.getCompletionCause(e); |
1106 if (e instanceof EOFException) { |
1117 if (e instanceof EOFException) { |
1107 // read EOF: no need to try & send reset |
1118 // read EOF: no need to try & send reset |
1108 connection.closeStream(streamid); |
1119 connection.closeStream(streamid); |
1109 } else { |
1120 } else { |