8178699: Fail to send async requests if server doesn't response the first one
Reviewed-by: dfuchs
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java Sun Jun 25 13:35:08 2017 -0700
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java Sun Jun 25 23:51:46 2017 -0700
@@ -266,7 +266,6 @@
throws IOException, InterruptedException
{
establishExchange(connection);
- exchImpl.setClientForRequest(requestProcessor);
if (request.expectContinue()) {
Log.logTrace("Sending Expect: 100-Continue");
request.addSystemHeader("Expect", "100-Continue");
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java Sun Jun 25 13:35:08 2017 -0700
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java Sun Jun 25 23:51:46 2017 -0700
@@ -26,7 +26,6 @@
package jdk.incubator.http;
import java.io.IOException;
-import jdk.incubator.http.RequestProcessors.ProcessorBase;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import jdk.incubator.http.internal.common.MinimalFuture;
@@ -50,11 +49,8 @@
final Exchange<T> exchange;
ExchangeImpl(Exchange<T> e) {
+ // e == null means a http/2 pushed stream
this.exchange = e;
- if (e != null) {
- // e == null means a http/2 pushed stream, therefore no request
- setClientForRequest(e.requestProcessor);
- }
}
final Exchange<T> getExchange() {
@@ -134,22 +130,6 @@
boolean returnConnectionToPool,
Executor executor);
- // Builtin processors need access to HttpClientImpl
- final void setClientForResponse(HttpResponse.BodyProcessor<T> proc) {
- if (proc instanceof ResponseProcessors.AbstractProcessor) {
- ResponseProcessors.AbstractProcessor<T> abProc =
- (ResponseProcessors.AbstractProcessor<T>)proc;
- abProc.setClient(exchange.client());
- }
- }
-
- final void setClientForRequest(HttpRequest.BodyProcessor proc) {
- if (proc instanceof ProcessorBase) {
- ProcessorBase abProc = (ProcessorBase)proc;
- abProc.setClient(exchange.client());
- }
- }
-
/**
* Async version of getResponse. Completes before body is read.
*/
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Exchange.java Sun Jun 25 13:35:08 2017 -0700
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Exchange.java Sun Jun 25 23:51:46 2017 -0700
@@ -96,7 +96,6 @@
{
BodyProcessor<T> processor = handler.apply(response.responseCode(),
response.responseHeaders());
- setClientForResponse(processor);
CompletableFuture<T> bodyCF = response.readBody(processor,
returnConnectionToPool,
this::executeInline);
@@ -122,7 +121,6 @@
{
BodyProcessor<T> processor = handler.apply(response.responseCode(),
response.responseHeaders());
- setClientForResponse(processor);
CompletableFuture<T> bodyCF = response.readBody(processor,
returnConnectionToPool,
executor);
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Request.java Sun Jun 25 13:35:08 2017 -0700
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Request.java Sun Jun 25 23:51:46 2017 -0700
@@ -33,7 +33,6 @@
import java.util.Set;
import java.net.InetSocketAddress;
import jdk.incubator.http.HttpConnection.Mode;
-import jdk.incubator.http.RequestProcessors.ProcessorBase;
import java.nio.charset.StandardCharsets;
import static java.nio.charset.StandardCharsets.US_ASCII;
import java.util.concurrent.CompletableFuture;
@@ -274,9 +273,7 @@
setFinished();
}
- class StreamSubscriber extends ProcessorBase
- implements Flow.Subscriber<ByteBuffer>
- {
+ class StreamSubscriber implements Flow.Subscriber<ByteBuffer> {
volatile Flow.Subscription subscription;
volatile boolean includeHeaders;
@@ -361,13 +358,11 @@
throws IOException
{
StreamSubscriber subscriber = new StreamSubscriber(includeHeaders);
- subscriber.setClient(client);
requestProc.subscribe(subscriber);
waitForCompletion();
}
- class FixedContentSubscriber extends ProcessorBase
- implements Flow.Subscriber<ByteBuffer>
+ class FixedContentSubscriber implements Flow.Subscriber<ByteBuffer>
{
volatile Flow.Subscription subscription;
volatile boolean includeHeaders;
@@ -451,7 +446,6 @@
return;
}
FixedContentSubscriber subscriber = new FixedContentSubscriber(includeHeaders);
- subscriber.setClient(client);
requestProc.subscribe(subscriber);
waitForCompletion();
}
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/RequestProcessors.java Sun Jun 25 13:35:08 2017 -0700
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/RequestProcessors.java Sun Jun 25 23:51:46 2017 -0700
@@ -38,29 +38,14 @@
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
-import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Flow;
import java.util.function.Supplier;
import jdk.incubator.http.internal.common.Utils;
class RequestProcessors {
- // common base class for Publisher and Subscribers used here
- abstract static class ProcessorBase {
- HttpClientImpl client;
- synchronized void setClient(HttpClientImpl client) {
- this.client = client;
- }
-
- synchronized HttpClientImpl getClient() {
- return client;
- }
- }
-
- static class ByteArrayProcessor extends ProcessorBase
- implements HttpRequest.BodyProcessor
- {
+ static class ByteArrayProcessor implements HttpRequest.BodyProcessor {
private volatile Flow.Publisher<ByteBuffer> delegate;
private final int length;
private final byte[] content;
@@ -105,9 +90,7 @@
}
// This implementation has lots of room for improvement.
- static class IterableProcessor extends ProcessorBase
- implements HttpRequest.BodyProcessor
- {
+ static class IterableProcessor implements HttpRequest.BodyProcessor {
private volatile Flow.Publisher<ByteBuffer> delegate;
private final Iterable<byte[]> content;
private volatile long contentLength;
@@ -202,8 +185,7 @@
}
}
- static class EmptyProcessor extends ProcessorBase implements HttpRequest.BodyProcessor
- {
+ static class EmptyProcessor implements HttpRequest.BodyProcessor {
PseudoPublisher<ByteBuffer> delegate = new PseudoPublisher<>();
@Override
@@ -303,9 +285,7 @@
}
- static class InputStreamProcessor extends ProcessorBase
- implements HttpRequest.BodyProcessor
- {
+ static class InputStreamProcessor implements HttpRequest.BodyProcessor {
private final Supplier<? extends InputStream> streamSupplier;
private Flow.Publisher<ByteBuffer> delegate;
@@ -315,11 +295,7 @@
@Override
public synchronized void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
- if (!(subscriber instanceof ProcessorBase)) {
- throw new UnsupportedOperationException();
- }
- ProcessorBase base = (ProcessorBase)subscriber;
- HttpClientImpl client = base.getClient();
+
InputStream is = streamSupplier.get();
if (is == null) {
throw new UncheckedIOException(new IOException("no inputstream supplied"));
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseProcessors.java Sun Jun 25 13:35:08 2017 -0700
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseProcessors.java Sun Jun 25 23:51:46 2017 -0700
@@ -51,21 +51,7 @@
class ResponseProcessors {
- abstract static class AbstractProcessor<T>
- implements HttpResponse.BodyProcessor<T>
- {
- HttpClientImpl client;
-
- synchronized void setClient(HttpClientImpl client) {
- this.client = client;
- }
-
- synchronized HttpClientImpl getClient() {
- return client;
- }
- }
-
- static class ConsumerProcessor extends AbstractProcessor<Void> {
+ static class ConsumerProcessor implements HttpResponse.BodyProcessor<Void> {
private final Consumer<Optional<byte[]>> consumer;
private Flow.Subscription subscription;
private final CompletableFuture<Void> result = new MinimalFuture<>();
@@ -106,7 +92,7 @@
}
- static class PathProcessor extends AbstractProcessor<Path> {
+ static class PathProcessor implements HttpResponse.BodyProcessor<Path> {
private final Path file;
private final CompletableFuture<Path> result = new MinimalFuture<>();
@@ -163,7 +149,7 @@
}
}
- static class ByteArrayProcessor<T> extends AbstractProcessor<T> {
+ static class ByteArrayProcessor<T> implements HttpResponse.BodyProcessor<T> {
private final Function<byte[], T> finisher;
private final CompletableFuture<T> result = new MinimalFuture<>();
private final List<ByteBuffer> received = new ArrayList<>();
@@ -301,7 +287,7 @@
/**
* Currently this consumes all of the data and ignores it
*/
- static class NullProcessor<T> extends AbstractProcessor<T> {
+ static class NullProcessor<T> implements HttpResponse.BodyProcessor<T> {
Flow.Subscription subscription;
final CompletableFuture<T> cf = new MinimalFuture<>();
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java Sun Jun 25 13:35:08 2017 -0700
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java Sun Jun 25 23:51:46 2017 -0700
@@ -153,7 +153,6 @@
{
Log.logTrace("Reading body on stream {0}", streamid);
responseProcessor = handler.apply(responseCode, responseHeaders);
- setClientForResponse(responseProcessor);
publisher.subscribe(responseProcessor);
CompletableFuture<T> cf = receiveData(executor);
@@ -573,10 +572,7 @@
connection.putStream(this, streamid);
}
- class RequestSubscriber
- extends RequestProcessors.ProcessorBase
- implements Flow.Subscriber<ByteBuffer>
- {
+ class RequestSubscriber implements Flow.Subscriber<ByteBuffer> {
// can be < 0 if the actual length is not known.
private volatile long remainingContentLength;
private volatile Subscription subscription;
@@ -768,7 +764,6 @@
CompletableFuture<Void> sendBodyImpl() {
RequestSubscriber subscriber = new RequestSubscriber(requestContentLen);
- subscriber.setClient(client);
requestProcessor.subscribe(subscriber);
requestBodyCF.whenComplete((v,t) -> requestSent());
return requestBodyCF;
--- a/jdk/test/java/net/httpclient/SmokeTest.java Sun Jun 25 13:35:08 2017 -0700
+++ b/jdk/test/java/net/httpclient/SmokeTest.java Sun Jun 25 23:51:46 2017 -0700
@@ -23,7 +23,7 @@
/*
* @test
- * @bug 8087112
+ * @bug 8087112 8178699
* @modules jdk.incubator.httpclient
* java.logging
* jdk.httpserver