http-client-branch: RST_STREAM should be handled immediately if received before HEADERS, and subscriber.onError() should be called if received after that. http-client-branch
authordfuchs
Thu, 03 May 2018 18:29:46 +0100
branchhttp-client-branch
changeset 56514 6ef5ca8283a4
parent 56513 17cb1166de81
child 56515 b55b69565ee2
http-client-branch: RST_STREAM should be handled immediately if received before HEADERS, and subscriber.onError() should be called if received after that.
src/java.net.http/share/classes/jdk/internal/net/http/Stream.java
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java	Thu May 03 18:27:32 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java	Thu May 03 18:29:46 2018 +0100
@@ -25,9 +25,9 @@
 
 package jdk.internal.net.http;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.UncheckedIOException;
-import java.lang.System.Logger.Level;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -171,7 +171,7 @@
                 Http2Frame frame = inputQ.peek();
                 if (frame instanceof ResetFrame) {
                     inputQ.remove();
-                    handleReset((ResetFrame)frame);
+                    handleReset((ResetFrame)frame, subscriber);
                     return;
                 }
                 DataFrame df = (DataFrame)frame;
@@ -424,25 +424,56 @@
         } else if (closed) {
             Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
         } else {
-            // put it in the input queue in order to read all
-            // pending data frames first. Indeed, a server may send
-            // RST_STREAM after sending END_STREAM, in which case we should
-            // ignore it. However, we won't know if we have received END_STREAM
-            // or not until all pending data frames are read.
-            receiveResetFrame(frame);
-            // RST_STREAM was pushed to the queue. It will be handled by
-            // asyncReceive after all pending data frames have been
-            // processed.
-            Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid);
+            Flow.Subscriber<?> subscriber =
+                    responseSubscriber == null ? pendingResponseSubscriber : responseSubscriber;
+            if (response == null && subscriber == null) {
+                // we haven't receive the headers yet, and won't receive any!
+                // handle reset now.
+                handleReset(frame, subscriber);
+            } else {
+                // put it in the input queue in order to read all
+                // pending data frames first. Indeed, a server may send
+                // RST_STREAM after sending END_STREAM, in which case we should
+                // ignore it. However, we won't know if we have received END_STREAM
+                // or not until all pending data frames are read.
+                receiveResetFrame(frame);
+                // RST_STREAM was pushed to the queue. It will be handled by
+                // asyncReceive after all pending data frames have been
+                // processed.
+                Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid);
+            }
         }
     }
 
-    void handleReset(ResetFrame frame) {
+    void handleReset(ResetFrame frame, Flow.Subscriber<?> subscriber) {
         Log.logTrace("Handling RST_STREAM on stream {0}", streamid);
         if (!closed) {
-            close();
-            int error = frame.getErrorCode();
-            completeResponseExceptionally(new IOException(ErrorFrame.stringForCode(error)));
+            synchronized (this) {
+                if (closed) {
+                    if (debug.on()) debug.log("Stream already closed: ignoring RESET");
+                    return;
+                }
+                closed = true;
+            }
+            try {
+                int error = frame.getErrorCode();
+                IOException e = new IOException("Received RST_STREAM: "
+                        + ErrorFrame.stringForCode(error));
+                if (errorRef.compareAndSet(null, e)) {
+                    if (subscriber != null) {
+                        subscriber.onError(e);
+                    }
+                }
+                completeResponseExceptionally(e);
+                if (!requestBodyCF.isDone()) {
+                    requestBodyCF.completeExceptionally(errorRef.get()); // we may be sending the body..
+                }
+                if (responseBodyCF != null) {
+                    responseBodyCF.completeExceptionally(errorRef.get());
+                }
+            } finally {
+                connection.closeStream(streamid);
+            }
         } else {
             Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
         }
@@ -1062,7 +1093,13 @@
         try {
             // will send a RST_STREAM frame
             if (streamid != 0) {
-                connection.resetStream(streamid, ResetFrame.CANCEL);
+                e = Utils.getCompletionCause(e);
+                if (e instanceof EOFException) {
+                    // read EOF: no need to try & send reset
+                    connection.closeStream(streamid);
+                } else {
+                    connection.resetStream(streamid, ResetFrame.CANCEL);
+                }
             }
         } catch (IOException ex) {
             Log.logError(ex);