http-client-branch: RST_STREAM should be handled immediately if received before HEADERS, and subscriber.onError() should be called if received after that.
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java Thu May 03 18:27:32 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java Thu May 03 18:29:46 2018 +0100
@@ -25,9 +25,9 @@
package jdk.internal.net.http;
+import java.io.EOFException;
import java.io.IOException;
import java.io.UncheckedIOException;
-import java.lang.System.Logger.Level;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -171,7 +171,7 @@
Http2Frame frame = inputQ.peek();
if (frame instanceof ResetFrame) {
inputQ.remove();
- handleReset((ResetFrame)frame);
+ handleReset((ResetFrame)frame, subscriber);
return;
}
DataFrame df = (DataFrame)frame;
@@ -424,25 +424,56 @@
} else if (closed) {
Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
} else {
- // put it in the input queue in order to read all
- // pending data frames first. Indeed, a server may send
- // RST_STREAM after sending END_STREAM, in which case we should
- // ignore it. However, we won't know if we have received END_STREAM
- // or not until all pending data frames are read.
- receiveResetFrame(frame);
- // RST_STREAM was pushed to the queue. It will be handled by
- // asyncReceive after all pending data frames have been
- // processed.
- Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid);
+ Flow.Subscriber<?> subscriber =
+ responseSubscriber == null ? pendingResponseSubscriber : responseSubscriber;
+ if (response == null && subscriber == null) {
+ // we haven't receive the headers yet, and won't receive any!
+ // handle reset now.
+ handleReset(frame, subscriber);
+ } else {
+ // put it in the input queue in order to read all
+ // pending data frames first. Indeed, a server may send
+ // RST_STREAM after sending END_STREAM, in which case we should
+ // ignore it. However, we won't know if we have received END_STREAM
+ // or not until all pending data frames are read.
+ receiveResetFrame(frame);
+ // RST_STREAM was pushed to the queue. It will be handled by
+ // asyncReceive after all pending data frames have been
+ // processed.
+ Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid);
+ }
}
}
- void handleReset(ResetFrame frame) {
+ void handleReset(ResetFrame frame, Flow.Subscriber<?> subscriber) {
Log.logTrace("Handling RST_STREAM on stream {0}", streamid);
if (!closed) {
- close();
- int error = frame.getErrorCode();
- completeResponseExceptionally(new IOException(ErrorFrame.stringForCode(error)));
+ synchronized (this) {
+ if (closed) {
+ if (debug.on()) debug.log("Stream already closed: ignoring RESET");
+ return;
+ }
+ closed = true;
+ }
+ try {
+ int error = frame.getErrorCode();
+ IOException e = new IOException("Received RST_STREAM: "
+ + ErrorFrame.stringForCode(error));
+ if (errorRef.compareAndSet(null, e)) {
+ if (subscriber != null) {
+ subscriber.onError(e);
+ }
+ }
+ completeResponseExceptionally(e);
+ if (!requestBodyCF.isDone()) {
+ requestBodyCF.completeExceptionally(errorRef.get()); // we may be sending the body..
+ }
+ if (responseBodyCF != null) {
+ responseBodyCF.completeExceptionally(errorRef.get());
+ }
+ } finally {
+ connection.closeStream(streamid);
+ }
} else {
Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
}
@@ -1062,7 +1093,13 @@
try {
// will send a RST_STREAM frame
if (streamid != 0) {
- connection.resetStream(streamid, ResetFrame.CANCEL);
+ e = Utils.getCompletionCause(e);
+ if (e instanceof EOFException) {
+ // read EOF: no need to try & send reset
+ connection.closeStream(streamid);
+ } else {
+ connection.resetStream(streamid, ResetFrame.CANCEL);
+ }
}
} catch (IOException ex) {
Log.logError(ex);