src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java
changeset 50681 4254bed3c09d
parent 49944 4690a2871b44
child 51462 9d7d74c6f2cb
child 56795 03ece2518428
equal deleted inserted replaced
50678:818a23db260c 50681:4254bed3c09d
    42 import jdk.internal.net.http.ResponseContent.BodyParser;
    42 import jdk.internal.net.http.ResponseContent.BodyParser;
    43 import jdk.internal.net.http.common.Log;
    43 import jdk.internal.net.http.common.Log;
    44 import jdk.internal.net.http.common.Logger;
    44 import jdk.internal.net.http.common.Logger;
    45 import jdk.internal.net.http.common.MinimalFuture;
    45 import jdk.internal.net.http.common.MinimalFuture;
    46 import jdk.internal.net.http.common.Utils;
    46 import jdk.internal.net.http.common.Utils;
    47 
       
    48 import static java.net.http.HttpClient.Version.HTTP_1_1;
    47 import static java.net.http.HttpClient.Version.HTTP_1_1;
    49 import static java.net.http.HttpResponse.BodySubscribers.discarding;
    48 import static java.net.http.HttpResponse.BodySubscribers.discarding;
       
    49 import static jdk.internal.net.http.common.Utils.wrapWithExtraDetail;
       
    50 import static jdk.internal.net.http.RedirectFilter.HTTP_NOT_MODIFIED;
    50 
    51 
    51 /**
    52 /**
    52  * Handles a HTTP/1.1 response (headers + body).
    53  * Handles a HTTP/1.1 response (headers + body).
    53  * There can be more than one of these per Http exchange.
    54  * There can be more than one of these per Http exchange.
    54  */
    55  */
    74     private volatile State readProgress = State.INITIAL;
    75     private volatile State readProgress = State.INITIAL;
    75 
    76 
    76     final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
    77     final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
    77     final static AtomicLong responseCount = new AtomicLong();
    78     final static AtomicLong responseCount = new AtomicLong();
    78     final long id = responseCount.incrementAndGet();
    79     final long id = responseCount.incrementAndGet();
       
    80     private Http1HeaderParser hd;
    79 
    81 
    80     Http1Response(HttpConnection conn,
    82     Http1Response(HttpConnection conn,
    81                   Http1Exchange<T> exchange,
    83                   Http1Exchange<T> exchange,
    82                   Http1AsyncReceiver asyncReceiver) {
    84                   Http1AsyncReceiver asyncReceiver) {
    83         this.readProgress = State.INITIAL;
    85         this.readProgress = State.INITIAL;
    85         this.exchange = exchange;
    87         this.exchange = exchange;
    86         this.connection = conn;
    88         this.connection = conn;
    87         this.asyncReceiver = asyncReceiver;
    89         this.asyncReceiver = asyncReceiver;
    88         headersReader = new HeadersReader(this::advance);
    90         headersReader = new HeadersReader(this::advance);
    89         bodyReader = new BodyReader(this::advance);
    91         bodyReader = new BodyReader(this::advance);
       
    92 
       
    93         hd = new Http1HeaderParser();
       
    94         readProgress = State.READING_HEADERS;
       
    95         headersReader.start(hd);
       
    96         asyncReceiver.subscribe(headersReader);
    90     }
    97     }
    91 
    98 
    92     String dbgTag;
    99     String dbgTag;
    93     private String dbgString() {
   100     private String dbgString() {
    94         String dbg = dbgTag;
   101         String dbg = dbgTag;
   148             }
   155             }
   149             state |= 0x02;
   156             state |= 0x02;
   150         }
   157         }
   151     }
   158     }
   152 
   159 
       
   160     private volatile boolean firstTimeAround = true;
       
   161 
   153     public CompletableFuture<Response> readHeadersAsync(Executor executor) {
   162     public CompletableFuture<Response> readHeadersAsync(Executor executor) {
   154         if (debug.on())
   163         if (debug.on())
   155             debug.log("Reading Headers: (remaining: "
   164             debug.log("Reading Headers: (remaining: "
   156                       + asyncReceiver.remaining() +") "  + readProgress);
   165                       + asyncReceiver.remaining() +") "  + readProgress);
   157         // with expect continue we will resume reading headers + body.
   166 
   158         asyncReceiver.unsubscribe(bodyReader);
   167         if (firstTimeAround) {
   159         bodyReader.reset();
   168             if (debug.on()) debug.log("First time around");
   160         Http1HeaderParser hd = new Http1HeaderParser();
   169             firstTimeAround = false;
   161         readProgress = State.READING_HEADERS;
   170         } else {
   162         headersReader.start(hd);
   171             // with expect continue we will resume reading headers + body.
   163         asyncReceiver.subscribe(headersReader);
   172             asyncReceiver.unsubscribe(bodyReader);
       
   173             bodyReader.reset();
       
   174 
       
   175             hd = new Http1HeaderParser();
       
   176             readProgress = State.READING_HEADERS;
       
   177             headersReader.reset();
       
   178             headersReader.start(hd);
       
   179             asyncReceiver.subscribe(headersReader);
       
   180         }
       
   181 
   164         CompletableFuture<State> cf = headersReader.completion();
   182         CompletableFuture<State> cf = headersReader.completion();
   165         assert cf != null : "parsing not started";
   183         assert cf != null : "parsing not started";
       
   184         if (debug.on()) {
       
   185             debug.log("headersReader is %s",
       
   186                     cf == null ? "not yet started"
       
   187                             : cf.isDone() ? "already completed"
       
   188                             : "not yet completed");
       
   189         }
   166 
   190 
   167         Function<State, Response> lambda = (State completed) -> {
   191         Function<State, Response> lambda = (State completed) -> {
   168                 assert completed == State.READING_HEADERS;
   192                 assert completed == State.READING_HEADERS;
   169                 if (debug.on())
   193                 if (debug.on())
   170                     debug.log("Reading Headers: creating Response object;"
   194                     debug.log("Reading Headers: creating Response object;"
   205     synchronized boolean finished() {
   229     synchronized boolean finished() {
   206         return finished;
   230         return finished;
   207     }
   231     }
   208 
   232 
   209     int fixupContentLen(int clen) {
   233     int fixupContentLen(int clen) {
   210         if (request.method().equalsIgnoreCase("HEAD")) {
   234         if (request.method().equalsIgnoreCase("HEAD") || responseCode == HTTP_NOT_MODIFIED) {
   211             return 0;
   235             return 0;
   212         }
   236         }
   213         if (clen == -1) {
   237         if (clen == -1) {
   214             if (headers.firstValue("Transfer-encoding").orElse("")
   238             if (headers.firstValue("Transfer-encoding").orElse("")
   215                        .equalsIgnoreCase("chunked")) {
   239                        .equalsIgnoreCase("chunked")) {
   287                 if (t == null) {
   311                 if (t == null) {
   288                     assert subscribed;
   312                     assert subscribed;
   289                     try {
   313                     try {
   290                         userSubscriber.onComplete();
   314                         userSubscriber.onComplete();
   291                     } catch (Throwable x) {
   315                     } catch (Throwable x) {
       
   316                         // Simply propagate the error by calling
       
   317                         // onError on the user subscriber, and let the
       
   318                         // connection be reused since we should have received
       
   319                         // and parsed all the bytes when we reach here.
       
   320                         // If onError throws in turn, then we will simply
       
   321                         // let that new exception flow up to the caller
       
   322                         // and let it deal with it.
       
   323                         // (i.e: log and close the connection)
       
   324                         // Note that rethrowing here could introduce a
       
   325                         // race that might cause the next send() operation to
       
   326                         // fail as the connection has already been put back
       
   327                         // into the cache when we reach here.
   292                         propagateError(t = withError = Utils.getCompletionCause(x));
   328                         propagateError(t = withError = Utils.getCompletionCause(x));
   293                         // rethrow and let the caller deal with it.
       
   294                         // (i.e: log and close the connection)
       
   295                         // arguably we could decide to not throw and let the
       
   296                         // connection be reused since we should have received and
       
   297                         // parsed all the bytes when we reach here.
       
   298                         throw x;
       
   299                     }
   329                     }
   300                 } else {
   330                 } else {
   301                     propagateError(t);
   331                     propagateError(t);
   302                 }
   332                 }
   303             }
   333             }
   611             return needsMore;
   641             return needsMore;
   612         }
   642         }
   613 
   643 
   614         @Override
   644         @Override
   615         public final void onReadError(Throwable t) {
   645         public final void onReadError(Throwable t) {
       
   646             t = wrapWithExtraDetail(t, parser::currentStateMessage);
   616             Http1Response.this.onReadError(t);
   647             Http1Response.this.onReadError(t);
   617         }
   648         }
   618 
   649 
   619         @Override
   650         @Override
   620         final void handle(ByteBuffer b,
   651         final void handle(ByteBuffer b,
   690             return accept(b, parser, cf);
   721             return accept(b, parser, cf);
   691         }
   722         }
   692 
   723 
   693         @Override
   724         @Override
   694         public final void onReadError(Throwable t) {
   725         public final void onReadError(Throwable t) {
       
   726             t = wrapWithExtraDetail(t, parser::currentStateMessage);
   695             Http1Response.this.onReadError(t);
   727             Http1Response.this.onReadError(t);
   696         }
   728         }
   697 
   729 
   698         @Override
   730         @Override
   699         public AbstractSubscription subscription() {
   731         public AbstractSubscription subscription() {