# HG changeset patch # User dfuchs # Date 1525368586 -3600 # Node ID 6ef5ca8283a49bb18a48354d1966ac3bf6b1bab5 # Parent 17cb1166de81faaa52c6ad95564e7e52c2fc1184 http-client-branch: RST_STREAM should be handled immediately if received before HEADERS, and subscriber.onError() should be called if received after that. diff -r 17cb1166de81 -r 6ef5ca8283a4 src/java.net.http/share/classes/jdk/internal/net/http/Stream.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java Thu May 03 18:27:32 2018 +0100 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java Thu May 03 18:29:46 2018 +0100 @@ -25,9 +25,9 @@ package jdk.internal.net.http; +import java.io.EOFException; import java.io.IOException; import java.io.UncheckedIOException; -import java.lang.System.Logger.Level; import java.net.URI; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -171,7 +171,7 @@ Http2Frame frame = inputQ.peek(); if (frame instanceof ResetFrame) { inputQ.remove(); - handleReset((ResetFrame)frame); + handleReset((ResetFrame)frame, subscriber); return; } DataFrame df = (DataFrame)frame; @@ -424,25 +424,56 @@ } else if (closed) { Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid); } else { - // put it in the input queue in order to read all - // pending data frames first. Indeed, a server may send - // RST_STREAM after sending END_STREAM, in which case we should - // ignore it. However, we won't know if we have received END_STREAM - // or not until all pending data frames are read. - receiveResetFrame(frame); - // RST_STREAM was pushed to the queue. It will be handled by - // asyncReceive after all pending data frames have been - // processed. - Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid); + Flow.Subscriber subscriber = + responseSubscriber == null ? pendingResponseSubscriber : responseSubscriber; + if (response == null && subscriber == null) { + // we haven't receive the headers yet, and won't receive any! + // handle reset now. + handleReset(frame, subscriber); + } else { + // put it in the input queue in order to read all + // pending data frames first. Indeed, a server may send + // RST_STREAM after sending END_STREAM, in which case we should + // ignore it. However, we won't know if we have received END_STREAM + // or not until all pending data frames are read. + receiveResetFrame(frame); + // RST_STREAM was pushed to the queue. It will be handled by + // asyncReceive after all pending data frames have been + // processed. + Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid); + } } } - void handleReset(ResetFrame frame) { + void handleReset(ResetFrame frame, Flow.Subscriber subscriber) { Log.logTrace("Handling RST_STREAM on stream {0}", streamid); if (!closed) { - close(); - int error = frame.getErrorCode(); - completeResponseExceptionally(new IOException(ErrorFrame.stringForCode(error))); + synchronized (this) { + if (closed) { + if (debug.on()) debug.log("Stream already closed: ignoring RESET"); + return; + } + closed = true; + } + try { + int error = frame.getErrorCode(); + IOException e = new IOException("Received RST_STREAM: " + + ErrorFrame.stringForCode(error)); + if (errorRef.compareAndSet(null, e)) { + if (subscriber != null) { + subscriber.onError(e); + } + } + completeResponseExceptionally(e); + if (!requestBodyCF.isDone()) { + requestBodyCF.completeExceptionally(errorRef.get()); // we may be sending the body.. + } + if (responseBodyCF != null) { + responseBodyCF.completeExceptionally(errorRef.get()); + } + } finally { + connection.closeStream(streamid); + } } else { Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid); } @@ -1062,7 +1093,13 @@ try { // will send a RST_STREAM frame if (streamid != 0) { - connection.resetStream(streamid, ResetFrame.CANCEL); + e = Utils.getCompletionCause(e); + if (e instanceof EOFException) { + // read EOF: no need to try & send reset + connection.closeStream(streamid); + } else { + connection.resetStream(streamid, ResetFrame.CANCEL); + } } } catch (IOException ex) { Log.logError(ex);