src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java
branchhttp-client-branch
changeset 56010 782b2f2d1e76
parent 56009 cf8792f51dee
child 56025 1f88e1587067
equal deleted inserted replaced
56009:cf8792f51dee 56010:782b2f2d1e76
    23  * questions.
    23  * questions.
    24  */
    24  */
    25 
    25 
    26 package jdk.incubator.http;
    26 package jdk.incubator.http;
    27 
    27 
       
    28 import jdk.incubator.http.internal.common.MinimalFuture;
    28 import java.io.BufferedReader;
    29 import java.io.BufferedReader;
    29 import java.io.IOException;
    30 import java.io.IOException;
    30 import java.io.InputStream;
    31 import java.io.InputStream;
    31 import java.net.URI;
    32 import java.net.URI;
    32 import jdk.incubator.http.ResponseSubscribers.MultiSubscriberImpl;
       
    33 import static jdk.incubator.http.internal.common.Utils.unchecked;
    33 import static jdk.incubator.http.internal.common.Utils.unchecked;
    34 import static jdk.incubator.http.internal.common.Utils.charsetFrom;
    34 import static jdk.incubator.http.internal.common.Utils.charsetFrom;
    35 import java.nio.ByteBuffer;
    35 import java.nio.ByteBuffer;
    36 import java.nio.charset.Charset;
    36 import java.nio.charset.Charset;
    37 import java.nio.channels.FileChannel;
    37 import java.nio.channels.FileChannel;
    44 import java.util.List;
    44 import java.util.List;
    45 import java.util.Objects;
    45 import java.util.Objects;
    46 import java.util.Optional;
    46 import java.util.Optional;
    47 import java.util.concurrent.CompletableFuture;
    47 import java.util.concurrent.CompletableFuture;
    48 import java.util.concurrent.CompletionStage;
    48 import java.util.concurrent.CompletionStage;
       
    49 import java.util.concurrent.ConcurrentMap;
    49 import java.util.concurrent.Flow;
    50 import java.util.concurrent.Flow;
    50 import java.util.concurrent.Flow.Subscriber;
    51 import java.util.concurrent.Flow.Subscriber;
       
    52 import java.util.function.BiFunction;
    51 import java.util.function.Consumer;
    53 import java.util.function.Consumer;
    52 import java.util.function.Function;
    54 import java.util.function.Function;
    53 import java.util.stream.Stream;
    55 import java.util.stream.Stream;
    54 import javax.net.ssl.SSLParameters;
    56 import javax.net.ssl.SSLParameters;
    55 
    57 
   312      * }
   314      * }
   313      * </pre>
   315      * </pre>
   314      *
   316      *
   315      * @param <T> the response body type
   317      * @param <T> the response body type
   316      */
   318      */
   317     @FunctionalInterface
       
   318     public interface BodyHandler<T> {
   319     public interface BodyHandler<T> {
   319 
       
   320         /**
   320         /**
   321          * Returns a {@link BodySubscriber BodySubscriber} considering the given
   321          * Returns a {@link BodySubscriber BodySubscriber} considering the given
   322          * response status code and headers. This method is always called before
   322          * response status code and headers. This method is always called before
   323          * the body is read and its implementation can decide to keep the body
   323          * the body is read and its implementation can decide to keep the body
   324          * and store it somewhere, or else discard it by returning the {@code
   324          * and store it somewhere, or else discard it by returning the {@code
   328          * @param statusCode the HTTP status code received
   328          * @param statusCode the HTTP status code received
   329          * @param responseHeaders the response headers received
   329          * @param responseHeaders the response headers received
   330          * @return a body subscriber
   330          * @return a body subscriber
   331          */
   331          */
   332         public BodySubscriber<T> apply(int statusCode, HttpHeaders responseHeaders);
   332         public BodySubscriber<T> apply(int statusCode, HttpHeaders responseHeaders);
       
   333 
   333 
   334 
   334         /**
   335         /**
   335          * Returns a response body handler that returns a {@link BodySubscriber
   336          * Returns a response body handler that returns a {@link BodySubscriber
   336          * BodySubscriber}{@code <Void>} obtained from {@linkplain
   337          * BodySubscriber}{@code <Void>} obtained from {@linkplain
   337          * BodySubscriber#fromSubscriber(Subscriber)}, with the given
   338          * BodySubscriber#fromSubscriber(Subscriber)}, with the given
   722                                 bufferSize);
   723                                 bufferSize);
   723          }
   724          }
   724     }
   725     }
   725 
   726 
   726     /**
   727     /**
       
   728      * A handler of <i>push promises</i> ...
       
   729      */
       
   730     public interface PushPromiseHandler<T> {
       
   731         /**
       
   732          * Notifies of an incoming Push Promise. The enclosing request from the user and the push promise
       
   733          * are supplied as parameters, and also a {@link Function} which must be called in the implementation
       
   734          * of this method, if the server push is to be accepted. If this method returns without the function
       
   735          * being called, then the push will be cancelled.
       
   736          */
       
   737         public void applyPushPromise(
       
   738             HttpRequest initial, HttpRequest pushPromise,
       
   739             Function<HttpResponse.BodyHandler<T>,CompletableFuture<HttpResponse<T>>> acceptor
       
   740         );
       
   741 
       
   742 
       
   743         /* package-private with push promise Map implementation */
       
   744         static class PushPromisesHandlerWithMap<T> implements PushPromiseHandler<T> {
       
   745 
       
   746             private final ConcurrentMap<HttpRequest,CompletableFuture<HttpResponse<T>>> pushPromisesMap;
       
   747             private final Function<HttpRequest,BodyHandler<T>> pushPromiseHandler;
       
   748 
       
   749             PushPromisesHandlerWithMap(Function<HttpRequest,BodyHandler<T>> pushPromiseHandler,
       
   750                                        ConcurrentMap<HttpRequest,CompletableFuture<HttpResponse<T>>> pushPromisesMap) {
       
   751                 this.pushPromiseHandler = pushPromiseHandler;
       
   752                 this.pushPromisesMap = pushPromisesMap;
       
   753             }
       
   754 
       
   755             @Override
       
   756             public void applyPushPromise(
       
   757                 HttpRequest initiatingRequest, HttpRequest pushRequest,
       
   758                 Function<HttpResponse.BodyHandler<T>,CompletableFuture<HttpResponse<T>>> acceptor)
       
   759             {
       
   760                 URI initiatingURI = initiatingRequest.uri();
       
   761                 URI pushRequestURI = pushRequest.uri();
       
   762                 if (!initiatingURI.getHost().equalsIgnoreCase(pushRequestURI.getHost()))
       
   763                     return;
       
   764 
       
   765                 int initiatingPort = initiatingURI.getPort();
       
   766                 if (initiatingPort == -1 ) {
       
   767                     if ("https".equalsIgnoreCase(initiatingURI.getScheme()))
       
   768                         initiatingPort = 443;
       
   769                     else
       
   770                         initiatingPort = 80;
       
   771                 }
       
   772                 int pushPort = pushRequestURI.getPort();
       
   773                 if (pushPort == -1 ) {
       
   774                     if ("https".equalsIgnoreCase(pushRequestURI.getScheme()))
       
   775                         pushPort = 443;
       
   776                     else
       
   777                         pushPort = 80;
       
   778                 }
       
   779                 if (initiatingPort != pushPort)
       
   780                     return;
       
   781 
       
   782                 CompletableFuture<HttpResponse<T>> cf = acceptor.apply(pushPromiseHandler.apply(pushRequest));
       
   783                 pushPromisesMap.put(pushRequest, cf);
       
   784             }
       
   785         }
       
   786 
       
   787         /**
       
   788          * Returns a push promise handler that accumulates push promises, and
       
   789          * their responses, into the given map.
       
   790          *
       
   791          * <p> Entries are added to the given map for each synthetic push
       
   792          * request ( push promise ) accepted. The entry's key is the
       
   793          * push request, and the entry's value is a CompletableFuture that
       
   794          * completes with the response corresponding to the key's push
       
   795          * request. A push request is rejected / cancelled if there is
       
   796          * already an entry in the map whose key is {@linplain HttpRequest#equal
       
   797          * equal} to it. A push request is rejected / cancelled if it
       
   798          * does not have the same origin as its initiating request.
       
   799          *
       
   800          * <p> Entries are added to the given map as soon as practically
       
   801          * possible when a push promise is received and accepted. That way code,
       
   802          * using such a map like a cache, can determine if a push promise has
       
   803          * been issued by the server and avoid making, possibly, unnecessary
       
   804          * requests.
       
   805          *
       
   806          * <p> The delivery of pushed content is not synchronized with the
       
   807          * delivery of the main response. However, when the main response
       
   808          * has been fully received, the map is guaranteed to be fully populated
       
   809          * with no more entries added. The individual {@code CompletableFutures}
       
   810          * contained in the Map may or may not already be completed at this point.
       
   811          *
       
   812          * @param <T> the push promise body type
       
   813          * @param pushPromiseHandler t he body handler to use for push promises
       
   814          * @param pushPromisesMap a map to accumulate push promises into
       
   815          * @return a push promise body handler
       
   816          */
       
   817         public static <T> PushPromiseHandler<T>
       
   818         withPushPromises(Function<HttpRequest,BodyHandler<T>> pushPromiseHandler,
       
   819                          ConcurrentMap<HttpRequest,CompletableFuture<HttpResponse<T>>> pushPromisesMap) {
       
   820             return new PushPromisesHandlerWithMap<>(pushPromiseHandler, pushPromisesMap);
       
   821         }
       
   822     }
       
   823 
       
   824     /**
   727      * A subscriber for response bodies.
   825      * A subscriber for response bodies.
   728      * {@Incubating}
   826      * {@Incubating}
   729      *
   827      *
   730      * <p> The object acts as a {@link Flow.Subscriber}&lt;{@link List}&lt;{@link
   828      * <p> The object acts as a {@link Flow.Subscriber}&lt;{@link List}&lt;{@link
   731      * ByteBuffer}&gt;&gt; to the HTTP client implementation, which publishes
   829      * ByteBuffer}&gt;&gt; to the HTTP client implementation, which publishes
  1079              if (bufferSize <= 0)
  1177              if (bufferSize <= 0)
  1080                  throw new IllegalArgumentException("must be greater than 0");
  1178                  throw new IllegalArgumentException("must be greater than 0");
  1081              return new BufferingSubscriber<T>(downstream, bufferSize);
  1179              return new BufferingSubscriber<T>(downstream, bufferSize);
  1082          }
  1180          }
  1083     }
  1181     }
  1084 
       
  1085     /**
       
  1086      * A response subscriber for a HTTP/2 multi response.
       
  1087      * {@Incubating}
       
  1088      *
       
  1089      * <p> A multi response comprises a main response, and zero or more additional
       
  1090      * responses. Each additional response is sent by the server in response to
       
  1091      * requests (PUSH_PROMISEs) that the server also generates. Additional responses are
       
  1092      * typically resources that the server expects the client will need which
       
  1093      * are related to the initial request.
       
  1094      * <p>
       
  1095      * Note. Instead of implementing this interface, applications should consider
       
  1096      * first using the mechanism (built on this interface) provided by
       
  1097      * {@link MultiSubscriber#asMap(java.util.function.Function, boolean)
       
  1098      * MultiSubscriber.asMap()} which is a slightly simplified, but also
       
  1099      * general purpose interface.
       
  1100      * <p>
       
  1101      * The server generated requests are also known as <i>push promises</i>.
       
  1102      * The server is permitted to send any number of these requests up to the
       
  1103      * point where the main response is fully received. Therefore, after
       
  1104      * completion of the main response, the final number of additional
       
  1105      * responses is known. Additional responses may be canceled, but given that
       
  1106      * the server does not wait for any acknowledgment before sending the
       
  1107      * response, this must be done quickly to avoid unnecessary data transmission.
       
  1108      *
       
  1109      * <p> {@code MultiSubscriber}s are parameterized with a type {@code U} which
       
  1110      * represents some meaningful aggregate of the responses received. This
       
  1111      * would typically be a collection of response or response body objects.
       
  1112      *
       
  1113      * @param <U> a type representing the aggregated results
       
  1114      * @param <T> a type representing all of the response bodies
       
  1115      *
       
  1116      * @since 9
       
  1117      */
       
  1118     public interface MultiSubscriber<U,T> {
       
  1119         /**
       
  1120          * Called for the main request from the user. This {@link HttpRequest}
       
  1121          * parameter is the request that was supplied to {@link
       
  1122          * HttpClient#sendAsync(HttpRequest, MultiSubscriber)}. The
       
  1123          * implementation must return an {@link BodyHandler} for the response
       
  1124          * body.
       
  1125          *
       
  1126          * @param request the request
       
  1127          *
       
  1128          * @return an optional body handler
       
  1129          */
       
  1130         BodyHandler<T> onRequest(HttpRequest request);
       
  1131 
       
  1132         /**
       
  1133          * Called for each push promise that is received. The {@link HttpRequest}
       
  1134          * parameter represents the PUSH_PROMISE. The implementation must return
       
  1135          * an {@code Optional} of {@link BodyHandler} for the response body.
       
  1136          * Different handlers (of the same type) can be returned for different
       
  1137          * pushes within the same multi send. If no handler (an empty {@code
       
  1138          * Optional}) is returned, then the push will be canceled. If required,
       
  1139          * the {@code CompletableFuture<Void>} supplied to the {@code
       
  1140          * onFinalPushPromise} parameter of {@link
       
  1141          * #completion(CompletableFuture, CompletableFuture)} can be used to
       
  1142          * determine when the final PUSH_PROMISE is received.
       
  1143          *
       
  1144          * @param pushPromise the push promise
       
  1145          *
       
  1146          * @return an optional body handler
       
  1147          */
       
  1148         Optional<BodyHandler<T>> onPushPromise(HttpRequest pushPromise);
       
  1149 
       
  1150         /**
       
  1151          * Called for each response received. For each request either one of
       
  1152          * onResponse() or onError() is guaranteed to be called, but not both.
       
  1153          *
       
  1154          * <p> Note: The reason for switching to this callback interface rather
       
  1155          * than using CompletableFutures supplied to onRequest() is that there
       
  1156          * is a subtle interaction between those CFs and the CF returned from
       
  1157          * completion() (or when onComplete() was called formerly). The completion()
       
  1158          * CF will not complete until after all of the work done by the onResponse()
       
  1159          * calls is done. Whereas if you just create CF's dependent on a supplied
       
  1160          * CF (to onRequest()) then the implementation has no visibility of the
       
  1161          * dependent CFs and can't guarantee to call onComplete() (or complete
       
  1162          * the completion() CF) after the dependent CFs complete.
       
  1163          *
       
  1164          * @param response the response received
       
  1165          */
       
  1166         void onResponse(HttpResponse<T> response);
       
  1167 
       
  1168         /**
       
  1169          * Called if an error occurs receiving a response. For each request
       
  1170          * either one of onResponse() or onError() is guaranteed to be called,
       
  1171          * but not both.
       
  1172          *
       
  1173          * @param request the main request or subsequent push promise
       
  1174          * @param t the Throwable that caused the error
       
  1175          */
       
  1176         void onError(HttpRequest request, Throwable t);
       
  1177 
       
  1178         /**
       
  1179          * Returns a {@link java.util.concurrent.CompletableFuture}{@code <U>}
       
  1180          * which completes when the aggregate result object itself is available.
       
  1181          * It is expected that the returned {@code CompletableFuture} will depend
       
  1182          * on one of the given {@code CompletableFuture<Void}s which themselves
       
  1183          * complete after all individual responses associated with the multi
       
  1184          * response have completed, or after all push promises have been received.
       
  1185          * This method is called after {@link #onRequest(HttpRequest)} but
       
  1186          * before any other methods.
       
  1187          *
       
  1188          * @implNote Implementations might follow the pattern shown below
       
  1189          * <pre>
       
  1190          * {@code
       
  1191          *      CompletableFuture<U> completion(
       
  1192          *              CompletableFuture<Void> onComplete,
       
  1193          *              CompletableFuture<Void> onFinalPushPromise)
       
  1194          *      {
       
  1195          *          return onComplete.thenApply((v) -> {
       
  1196          *              U u = ... instantiate and populate a U instance
       
  1197          *              return u;
       
  1198          *          });
       
  1199          *      }
       
  1200          * }
       
  1201          * </pre>
       
  1202          *
       
  1203          * @param onComplete a CompletableFuture which completes after all
       
  1204          * responses have been received relating to this multi request.
       
  1205          *
       
  1206          * @param onFinalPushPromise CompletableFuture which completes after all
       
  1207          * push promises have been received.
       
  1208          *
       
  1209          * @return the aggregate CF response object
       
  1210          */
       
  1211         CompletableFuture<U> completion(CompletableFuture<Void> onComplete,
       
  1212                 CompletableFuture<Void> onFinalPushPromise);
       
  1213 
       
  1214         /**
       
  1215          * Returns a general purpose handler for multi responses. The aggregated
       
  1216          * result object produced by this handler is a
       
  1217          * {@code Map<HttpRequest,CompletableFuture<HttpResponse<V>>>}. Each
       
  1218          * request (both the original user generated request and each server
       
  1219          * generated push promise) is returned as a key of the map. The value
       
  1220          * corresponding to each key is a
       
  1221          * {@code CompletableFuture<HttpResponse<V>>}.
       
  1222          *
       
  1223          * <p> There are two ways to use these handlers, depending on the value
       
  1224          * of the <i>completion</I> parameter. If completion is true, then the
       
  1225          * aggregated result will be available after all responses have
       
  1226          * themselves completed. If <i>completion</i> is false, then the
       
  1227          * aggregated result will be available immediately after the last push
       
  1228          * promise was received. In the former case, this implies that all the
       
  1229          * CompletableFutures in the map values will have completed. In the
       
  1230          * latter case, they may or may not have completed yet.
       
  1231          *
       
  1232          * <p> The simplest way to use these handlers is to set completion to
       
  1233          * {@code true}, and then all (results) values in the Map will be
       
  1234          * accessible without blocking.
       
  1235          * <p>
       
  1236          * See {@link #asMap(java.util.function.Function, boolean)}
       
  1237          * for a code sample of using this interface.
       
  1238          *
       
  1239          * <p> See {@link #asMap(Function, boolean)} for a code sample of using
       
  1240          * this interface.
       
  1241          *
       
  1242          * @param <V> the body type used for all responses
       
  1243          * @param reqHandler a function invoked for the user's request and each
       
  1244          *                   push promise
       
  1245          * @param completion {@code true} if the aggregate CompletableFuture
       
  1246          *                   completes after all responses have been received,
       
  1247          *                   or {@code false} after all push promises received
       
  1248          *
       
  1249          * @return a MultiSubscriber
       
  1250          */
       
  1251         public static <V> MultiSubscriber<MultiMapResult<V>,V> asMap(
       
  1252                 Function<HttpRequest, Optional<HttpResponse.BodyHandler<V>>> reqHandler,
       
  1253                 boolean completion) {
       
  1254             return new MultiSubscriberImpl<V>(reqHandler.andThen(optv -> optv.get()),
       
  1255                                               reqHandler,
       
  1256                                               completion);
       
  1257         }
       
  1258 
       
  1259         /**
       
  1260          * Returns a general purpose handler for multi responses. This is a
       
  1261          * convenience method which invokes {@link #asMap(Function,boolean)
       
  1262          * asMap(Function, true)} meaning that the aggregate result
       
  1263          * object completes after all responses have been received.
       
  1264          *
       
  1265          * <p><b>Example usage:</b>
       
  1266          * <br>
       
  1267          * <pre>
       
  1268          * {@code
       
  1269          *          HttpRequest request = HttpRequest.newBuilder()
       
  1270          *                  .uri(URI.create("https://www.foo.com/"))
       
  1271          *                  .GET()
       
  1272          *                  .build();
       
  1273          *
       
  1274          *          HttpClient client = HttpClient.newHttpClient();
       
  1275          *
       
  1276          *          Map<HttpRequest,CompletableFuture<HttpResponse<String>>> results = client
       
  1277          *              .sendAsync(request, MultiSubscriber.asMap(
       
  1278          *                  (req) -> Optional.of(HttpResponse.BodyHandler.asString())))
       
  1279          *              .join();
       
  1280          * }</pre>
       
  1281          *
       
  1282          * <p> The lambda in this example is the simplest possible implementation,
       
  1283          * where neither the incoming requests are examined, nor the response
       
  1284          * headers, and every push that the server sends is accepted. When the
       
  1285          * join() call returns, all {@code HttpResponse}s and their associated
       
  1286          * body objects are available.
       
  1287          *
       
  1288          * @param <V> the body type used for all responses
       
  1289          * @param reqHandler a function invoked for each push promise and the
       
  1290          *                   main request
       
  1291          * @return a MultiSubscriber
       
  1292          */
       
  1293         public static <V> MultiSubscriber<MultiMapResult<V>,V> asMap(
       
  1294                 Function<HttpRequest, Optional<HttpResponse.BodyHandler<V>>> reqHandler) {
       
  1295 
       
  1296             return asMap(reqHandler, true);
       
  1297         }
       
  1298 
       
  1299     }
       
  1300 }
  1182 }