src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java
branchhttp-client-branch
changeset 56165 8a6065d830b9
parent 56126 86e628130926
child 56167 96fa4f49a9ff
equal deleted inserted replaced
56164:4db4bec0e5bb 56165:8a6065d830b9
    27 
    27 
    28 import java.io.EOFException;
    28 import java.io.EOFException;
    29 import java.lang.System.Logger.Level;
    29 import java.lang.System.Logger.Level;
    30 import java.nio.ByteBuffer;
    30 import java.nio.ByteBuffer;
    31 import java.util.concurrent.CompletableFuture;
    31 import java.util.concurrent.CompletableFuture;
    32 import java.util.concurrent.CompletionStage;
       
    33 import java.util.concurrent.Executor;
    32 import java.util.concurrent.Executor;
    34 import java.util.function.BiConsumer;
    33 import java.util.concurrent.atomic.AtomicLong;
    35 import java.util.function.Consumer;
    34 import java.util.function.Consumer;
    36 import java.util.function.Function;
    35 import java.util.function.Function;
    37 import java.net.http.HttpHeaders;
    36 import java.net.http.HttpHeaders;
    38 import java.net.http.HttpResponse;
    37 import java.net.http.HttpResponse;
    39 import jdk.internal.net.http.ResponseContent.BodyParser;
    38 import jdk.internal.net.http.ResponseContent.BodyParser;
    65 
    64 
    66     // Revisit: can we get rid of this?
    65     // Revisit: can we get rid of this?
    67     static enum State {INITIAL, READING_HEADERS, READING_BODY, DONE}
    66     static enum State {INITIAL, READING_HEADERS, READING_BODY, DONE}
    68     private volatile State readProgress = State.INITIAL;
    67     private volatile State readProgress = State.INITIAL;
    69     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
    68     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
    70     final System.Logger  debug = Utils.getDebugLogger(this.getClass()::getSimpleName, DEBUG);
    69     final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
    71 
    70     final static AtomicLong responseCount = new AtomicLong();
       
    71     final long id = responseCount.incrementAndGet();
    72 
    72 
    73     Http1Response(HttpConnection conn,
    73     Http1Response(HttpConnection conn,
    74                   Http1Exchange<T> exchange,
    74                   Http1Exchange<T> exchange,
    75                   Http1AsyncReceiver asyncReceiver) {
    75                   Http1AsyncReceiver asyncReceiver) {
    76         this.readProgress = State.INITIAL;
    76         this.readProgress = State.INITIAL;
    78         this.exchange = exchange;
    78         this.exchange = exchange;
    79         this.connection = conn;
    79         this.connection = conn;
    80         this.asyncReceiver = asyncReceiver;
    80         this.asyncReceiver = asyncReceiver;
    81         headersReader = new HeadersReader(this::advance);
    81         headersReader = new HeadersReader(this::advance);
    82         bodyReader = new BodyReader(this::advance);
    82         bodyReader = new BodyReader(this::advance);
       
    83     }
       
    84 
       
    85     String dbgTag;
       
    86     private String dbgString() {
       
    87         String dbg = dbgTag;
       
    88         if (dbg == null) {
       
    89             String cdbg = connection.dbgTag;
       
    90             if (cdbg != null) {
       
    91                 dbgTag = dbg = "Http1Response(id=" + id + ", " + cdbg + ")";
       
    92             } else {
       
    93                 dbg = "Http1Response(id=" + id + ")";
       
    94             }
       
    95         }
       
    96         return dbg;
       
    97     }
       
    98 
       
    99     // The ClientRefCountTracker is used to track the state
       
   100     // of a pending operation. Altough there usually is a single
       
   101     // point where the operation starts, it may terminate at
       
   102     // different places.
       
   103     private final class ClientRefCountTracker {
       
   104         final HttpClientImpl client = connection.client();
       
   105         // state & 0x01 != 0 => acquire called
       
   106         // state & 0x02 != 0 => tryRelease called
       
   107         byte state;
       
   108 
       
   109         public synchronized void acquire() {
       
   110             if (state == 0) {
       
   111                 // increment the reference count on the HttpClientImpl
       
   112                 // to prevent the SelectorManager thread from exiting
       
   113                 // until our operation is complete.
       
   114                 debug.log(Level.DEBUG, "incrementing ref count for %s", client);
       
   115                 client.reference();
       
   116                 state = 0x01;
       
   117             } else {
       
   118                 assert (state & 0x01) == 0 : "reference count already incremented";
       
   119             }
       
   120         }
       
   121 
       
   122         public synchronized void tryRelease() {
       
   123             if (state == 0x01) {
       
   124                 // decrement the reference count on the HttpClientImpl
       
   125                 // to allow the SelectorManager thread to exit if no
       
   126                 // other operation is pending and the facade is no
       
   127                 // longer referenced.
       
   128                 debug.log(Level.DEBUG, "decrementing ref count for %s", client);
       
   129                 client.unreference();
       
   130                 state |= 0x02;
       
   131             }
       
   132         }
    83     }
   133     }
    84 
   134 
    85    public CompletableFuture<Response> readHeadersAsync(Executor executor) {
   135    public CompletableFuture<Response> readHeadersAsync(Executor executor) {
    86         debug.log(Level.DEBUG, () -> "Reading Headers: (remaining: "
   136         debug.log(Level.DEBUG, () -> "Reading Headers: (remaining: "
    87                 + asyncReceiver.remaining() +") "  + readProgress);
   137                 + asyncReceiver.remaining() +") "  + readProgress);
   155         } else {
   205         } else {
   156             return readBody(HttpResponse.BodySubscriber.discard(), true, executor);
   206             return readBody(HttpResponse.BodySubscriber.discard(), true, executor);
   157         }
   207         }
   158     }
   208     }
   159 
   209 
       
   210 
   160     public <U> CompletableFuture<U> readBody(HttpResponse.BodySubscriber<U> p,
   211     public <U> CompletableFuture<U> readBody(HttpResponse.BodySubscriber<U> p,
   161                                          boolean return2Cache,
   212                                          boolean return2Cache,
   162                                          Executor executor) {
   213                                          Executor executor) {
   163         this.return2Cache = return2Cache;
   214         this.return2Cache = return2Cache;
   164         final HttpResponse.BodySubscriber<U> pusher = p;
   215         final HttpResponse.BodySubscriber<U> pusher = p;
   171 
   222 
   172         // expect-continue reads headers and body twice.
   223         // expect-continue reads headers and body twice.
   173         // if we reach here, we must reset the headersReader state.
   224         // if we reach here, we must reset the headersReader state.
   174         asyncReceiver.unsubscribe(headersReader);
   225         asyncReceiver.unsubscribe(headersReader);
   175         headersReader.reset();
   226         headersReader.reset();
       
   227         ClientRefCountTracker refCountTracker = new ClientRefCountTracker();
   176 
   228 
   177         executor.execute(() -> {
   229         executor.execute(() -> {
   178             try {
   230             try {
   179                 HttpClientImpl client = connection.client();
       
   180                 content = new ResponseContent(
   231                 content = new ResponseContent(
   181                         connection, clen, headers, pusher,
   232                         connection, clen, headers, pusher,
   182                         this::onFinished
   233                         this::onFinished
   183                 );
   234                 );
   184                 if (cf.isCompletedExceptionally()) {
   235                 if (cf.isCompletedExceptionally()) {
   187                     return;
   238                     return;
   188                 }
   239                 }
   189                 // increment the reference count on the HttpClientImpl
   240                 // increment the reference count on the HttpClientImpl
   190                 // to prevent the SelectorManager thread from exiting until
   241                 // to prevent the SelectorManager thread from exiting until
   191                 // the body is fully read.
   242                 // the body is fully read.
   192                 client.reference();
   243                 refCountTracker.acquire();
   193                 bodyReader.start(content.getBodyParser(
   244                 bodyReader.start(content.getBodyParser(
   194                     (t) -> {
   245                     (t) -> {
   195                         try {
   246                         try {
   196                             if (t != null) {
   247                             if (t != null) {
   197                                 pusher.onError(t);
   248                                 pusher.onError(t);
   198                                 connection.close();
   249                                 connection.close();
   199                                 if (!cf.isDone())
   250                                 if (!cf.isDone())
   200                                     cf.completeExceptionally(t);
   251                                     cf.completeExceptionally(t);
   201                             }
   252                             }
   202                         } finally {
   253                         } finally {
   203                             // decrement the reference count on the HttpClientImpl
       
   204                             // to allow the SelectorManager thread to exit if no
       
   205                             // other operation is pending and the facade is no
       
   206                             // longer referenced.
       
   207                             client.unreference();
       
   208                             bodyReader.onComplete(t);
   254                             bodyReader.onComplete(t);
   209                         }
   255                         }
   210                     }));
   256                     }));
   211                 CompletableFuture<State> bodyReaderCF = bodyReader.completion();
   257                 CompletableFuture<State> bodyReaderCF = bodyReader.completion();
   212                 asyncReceiver.subscribe(bodyReader);
   258                 asyncReceiver.subscribe(bodyReader);
   214                 // Make sure to keep a reference to asyncReceiver from
   260                 // Make sure to keep a reference to asyncReceiver from
   215                 // within this
   261                 // within this
   216                 CompletableFuture<?> trailingOp = bodyReaderCF.whenComplete((s,t) ->  {
   262                 CompletableFuture<?> trailingOp = bodyReaderCF.whenComplete((s,t) ->  {
   217                     t = Utils.getCompletionCause(t);
   263                     t = Utils.getCompletionCause(t);
   218                     try {
   264                     try {
   219                         if (t != null) {
   265                         if (t == null) {
   220                             debug.log(Level.DEBUG, () ->
   266                             debug.log(Level.DEBUG, () ->
   221                                     "Finished reading body: " + s);
   267                                     "Finished reading body: " + s);
   222                             assert s == State.READING_BODY;
   268                             assert s == State.READING_BODY;
   223                         }
   269                         }
   224                         if (t != null && !cf.isDone()) {
   270                         if (t != null && !cf.isDone()) {
   226                             cf.completeExceptionally(t);
   272                             cf.completeExceptionally(t);
   227                         }
   273                         }
   228                     } catch (Throwable x) {
   274                     } catch (Throwable x) {
   229                         // not supposed to happen
   275                         // not supposed to happen
   230                         asyncReceiver.onReadError(x);
   276                         asyncReceiver.onReadError(x);
       
   277                     } finally {
       
   278                         // we're done: release the ref count for
       
   279                         // the current operation.
       
   280                         refCountTracker.tryRelease();
   231                     }
   281                     }
   232                 });
   282                 });
   233                 connection.addTrailingOperation(trailingOp);
   283                 connection.addTrailingOperation(trailingOp);
   234             } catch (Throwable t) {
   284             } catch (Throwable t) {
   235                debug.log(Level.DEBUG, () -> "Failed reading body: " + t);
   285                debug.log(Level.DEBUG, () -> "Failed reading body: " + t);
   241                 } finally {
   291                 } finally {
   242                     asyncReceiver.onReadError(t);
   292                     asyncReceiver.onReadError(t);
   243                 }
   293                 }
   244             }
   294             }
   245         });
   295         });
   246         p.getBody().whenComplete((U u, Throwable t) -> {
   296         try {
   247             if (t == null)
   297             p.getBody().whenComplete((U u, Throwable t) -> {
   248                 cf.complete(u);
   298                 if (t == null)
   249             else
   299                     cf.complete(u);
   250                 cf.completeExceptionally(t);
   300                 else
       
   301                     cf.completeExceptionally(t);
       
   302             });
       
   303         } catch (Throwable t) {
       
   304             cf.completeExceptionally(t);
       
   305             asyncReceiver.setRetryOnError(false);
       
   306             asyncReceiver.onReadError(t);
       
   307         }
       
   308 
       
   309         return cf.whenComplete((s,t) -> {
       
   310             if (t != null) {
       
   311                 // If an exception occurred, release the
       
   312                 // ref count for the current operation, as
       
   313                 // it may never be triggered otherwise
       
   314                 // (BodySubscriber ofInputStream)
       
   315                 // If there was no exception then the
       
   316                 // ref count will be/have been released when
       
   317                 // the last byte of the response is/was received
       
   318                 refCountTracker.tryRelease();
       
   319             }
   251         });
   320         });
   252 
       
   253         return cf;
       
   254     }
   321     }
   255 
   322 
   256 
   323 
   257     private void onFinished() {
   324     private void onFinished() {
   258         asyncReceiver.clear();
   325         asyncReceiver.clear();