src/java.net.http/share/classes/jdk/internal/net/http/Stream.java
branchhttp-client-branch
changeset 56166 56c52d6417d1
parent 56165 8a6065d830b9
child 56200 d712a6342387
equal deleted inserted replaced
56165:8a6065d830b9 56166:56c52d6417d1
   111 
   111 
   112     long requestContentLen;
   112     long requestContentLen;
   113 
   113 
   114     final Http2Connection connection;
   114     final Http2Connection connection;
   115     final HttpRequestImpl request;
   115     final HttpRequestImpl request;
   116     final DecodingCallback rspHeadersConsumer;
   116     final HeadersConsumer rspHeadersConsumer;
   117     HttpHeadersImpl responseHeaders;
   117     final HttpHeadersImpl responseHeaders;
   118     final HttpHeadersImpl requestPseudoHeaders;
   118     final HttpHeadersImpl requestPseudoHeaders;
   119     volatile HttpResponse.BodySubscriber<T> responseSubscriber;
   119     volatile HttpResponse.BodySubscriber<T> responseSubscriber;
   120     final HttpRequest.BodyPublisher requestPublisher;
   120     final HttpRequest.BodyPublisher requestPublisher;
   121     volatile RequestSubscriber requestSubscriber;
   121     volatile RequestSubscriber requestSubscriber;
   122     volatile int responseCode;
   122     volatile int responseCode;
   230                                        boolean returnConnectionToPool,
   230                                        boolean returnConnectionToPool,
   231                                        Executor executor)
   231                                        Executor executor)
   232     {
   232     {
   233         try {
   233         try {
   234             Log.logTrace("Reading body on stream {0}", streamid);
   234             Log.logTrace("Reading body on stream {0}", streamid);
   235             BodySubscriber<T> bodySubscriber = handler.apply(responseCode, responseHeaders);
   235             BodySubscriber<T> bodySubscriber = handler.apply(responseCode, response.headers);
   236             CompletableFuture<T> cf = receiveData(bodySubscriber, executor);
   236             CompletableFuture<T> cf = receiveData(bodySubscriber, executor);
   237 
   237 
   238             PushGroup<?> pg = exchange.getPushGroup();
   238             PushGroup<?> pg = exchange.getPushGroup();
   239             if (pg != null) {
   239             if (pg != null) {
   240                 // if an error occurs make sure it is recorded in the PushGroup
   240                 // if an error occurs make sure it is recorded in the PushGroup
   241                 cf = cf.whenComplete((t, e) -> pg.pushError(e));
   241                 cf = cf.whenComplete((t, e) -> pg.pushError(e));
   242             }
   242             }
   243             return cf;
   243             return cf;
   244         } catch (Throwable t) {
   244         } catch (Throwable t) {
   245             // may be thrown by handler.apply
   245             // may be thrown by handler.apply
       
   246             cancelImpl(t);
   246             return MinimalFuture.failedFuture(t);
   247             return MinimalFuture.failedFuture(t);
   247         }
   248         }
   248     }
   249     }
   249 
   250 
   250     @Override
   251     @Override
   384         if (Log.headers()) {
   385         if (Log.headers()) {
   385             StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n");
   386             StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n");
   386             Log.dumpHeaders(sb, "    ", responseHeaders);
   387             Log.dumpHeaders(sb, "    ", responseHeaders);
   387             Log.logHeaders(sb.toString());
   388             Log.logHeaders(sb.toString());
   388         }
   389         }
       
   390 
       
   391         // this will clear the response headers
       
   392         rspHeadersConsumer.reset();
   389 
   393 
   390         completeResponse(response);
   394         completeResponse(response);
   391     }
   395     }
   392 
   396 
   393     void incoming_reset(ResetFrame frame) {
   397     void incoming_reset(ResetFrame frame) {
  1143                 sb.append(" (streamid=").append(streamid).append("): ");
  1147                 sb.append(" (streamid=").append(streamid).append("): ");
  1144                 Log.dumpHeaders(sb, "    ", responseHeaders);
  1148                 Log.dumpHeaders(sb, "    ", responseHeaders);
  1145                 Log.logHeaders(sb.toString());
  1149                 Log.logHeaders(sb.toString());
  1146             }
  1150             }
  1147 
  1151 
       
  1152             rspHeadersConsumer.reset();
       
  1153 
  1148             // different implementations for normal streams and pushed streams
  1154             // different implementations for normal streams and pushed streams
  1149             completeResponse(response);
  1155             completeResponse(response);
  1150         }
  1156         }
  1151     }
  1157     }
  1152 
  1158 
  1181     final String dbgString() {
  1187     final String dbgString() {
  1182         return connection.dbgString() + "/Stream("+streamid+")";
  1188         return connection.dbgString() + "/Stream("+streamid+")";
  1183     }
  1189     }
  1184 
  1190 
  1185     private class HeadersConsumer extends Http2Connection.ValidatingHeadersConsumer {
  1191     private class HeadersConsumer extends Http2Connection.ValidatingHeadersConsumer {
       
  1192 
       
  1193         void reset() {
       
  1194             responseHeaders.clear();
       
  1195         }
  1186 
  1196 
  1187         @Override
  1197         @Override
  1188         public void onDecoded(CharSequence name, CharSequence value)
  1198         public void onDecoded(CharSequence name, CharSequence value)
  1189                 throws UncheckedIOException
  1199                 throws UncheckedIOException
  1190         {
  1200         {