722 bufferSize); |
723 bufferSize); |
723 } |
724 } |
724 } |
725 } |
725 |
726 |
726 /** |
727 /** |
|
728 * A handler of <i>push promises</i> ... |
|
729 */ |
|
730 public interface PushPromiseHandler<T> { |
|
731 /** |
|
732 * Notifies of an incoming Push Promise. The enclosing request from the user and the push promise |
|
733 * are supplied as parameters, and also a {@link Function} which must be called in the implementation |
|
734 * of this method, if the server push is to be accepted. If this method returns without the function |
|
735 * being called, then the push will be cancelled. |
|
736 */ |
|
737 public void applyPushPromise( |
|
738 HttpRequest initial, HttpRequest pushPromise, |
|
739 Function<HttpResponse.BodyHandler<T>,CompletableFuture<HttpResponse<T>>> acceptor |
|
740 ); |
|
741 |
|
742 |
|
743 /* package-private with push promise Map implementation */ |
|
744 static class PushPromisesHandlerWithMap<T> implements PushPromiseHandler<T> { |
|
745 |
|
746 private final ConcurrentMap<HttpRequest,CompletableFuture<HttpResponse<T>>> pushPromisesMap; |
|
747 private final Function<HttpRequest,BodyHandler<T>> pushPromiseHandler; |
|
748 |
|
749 PushPromisesHandlerWithMap(Function<HttpRequest,BodyHandler<T>> pushPromiseHandler, |
|
750 ConcurrentMap<HttpRequest,CompletableFuture<HttpResponse<T>>> pushPromisesMap) { |
|
751 this.pushPromiseHandler = pushPromiseHandler; |
|
752 this.pushPromisesMap = pushPromisesMap; |
|
753 } |
|
754 |
|
755 @Override |
|
756 public void applyPushPromise( |
|
757 HttpRequest initiatingRequest, HttpRequest pushRequest, |
|
758 Function<HttpResponse.BodyHandler<T>,CompletableFuture<HttpResponse<T>>> acceptor) |
|
759 { |
|
760 URI initiatingURI = initiatingRequest.uri(); |
|
761 URI pushRequestURI = pushRequest.uri(); |
|
762 if (!initiatingURI.getHost().equalsIgnoreCase(pushRequestURI.getHost())) |
|
763 return; |
|
764 |
|
765 int initiatingPort = initiatingURI.getPort(); |
|
766 if (initiatingPort == -1 ) { |
|
767 if ("https".equalsIgnoreCase(initiatingURI.getScheme())) |
|
768 initiatingPort = 443; |
|
769 else |
|
770 initiatingPort = 80; |
|
771 } |
|
772 int pushPort = pushRequestURI.getPort(); |
|
773 if (pushPort == -1 ) { |
|
774 if ("https".equalsIgnoreCase(pushRequestURI.getScheme())) |
|
775 pushPort = 443; |
|
776 else |
|
777 pushPort = 80; |
|
778 } |
|
779 if (initiatingPort != pushPort) |
|
780 return; |
|
781 |
|
782 CompletableFuture<HttpResponse<T>> cf = acceptor.apply(pushPromiseHandler.apply(pushRequest)); |
|
783 pushPromisesMap.put(pushRequest, cf); |
|
784 } |
|
785 } |
|
786 |
|
787 /** |
|
788 * Returns a push promise handler that accumulates push promises, and |
|
789 * their responses, into the given map. |
|
790 * |
|
791 * <p> Entries are added to the given map for each synthetic push |
|
792 * request ( push promise ) accepted. The entry's key is the |
|
793 * push request, and the entry's value is a CompletableFuture that |
|
794 * completes with the response corresponding to the key's push |
|
795 * request. A push request is rejected / cancelled if there is |
|
796 * already an entry in the map whose key is {@linplain HttpRequest#equal |
|
797 * equal} to it. A push request is rejected / cancelled if it |
|
798 * does not have the same origin as its initiating request. |
|
799 * |
|
800 * <p> Entries are added to the given map as soon as practically |
|
801 * possible when a push promise is received and accepted. That way code, |
|
802 * using such a map like a cache, can determine if a push promise has |
|
803 * been issued by the server and avoid making, possibly, unnecessary |
|
804 * requests. |
|
805 * |
|
806 * <p> The delivery of pushed content is not synchronized with the |
|
807 * delivery of the main response. However, when the main response |
|
808 * has been fully received, the map is guaranteed to be fully populated |
|
809 * with no more entries added. The individual {@code CompletableFutures} |
|
810 * contained in the Map may or may not already be completed at this point. |
|
811 * |
|
812 * @param <T> the push promise body type |
|
813 * @param pushPromiseHandler t he body handler to use for push promises |
|
814 * @param pushPromisesMap a map to accumulate push promises into |
|
815 * @return a push promise body handler |
|
816 */ |
|
817 public static <T> PushPromiseHandler<T> |
|
818 withPushPromises(Function<HttpRequest,BodyHandler<T>> pushPromiseHandler, |
|
819 ConcurrentMap<HttpRequest,CompletableFuture<HttpResponse<T>>> pushPromisesMap) { |
|
820 return new PushPromisesHandlerWithMap<>(pushPromiseHandler, pushPromisesMap); |
|
821 } |
|
822 } |
|
823 |
|
824 /** |
727 * A subscriber for response bodies. |
825 * A subscriber for response bodies. |
728 * {@Incubating} |
826 * {@Incubating} |
729 * |
827 * |
730 * <p> The object acts as a {@link Flow.Subscriber}<{@link List}<{@link |
828 * <p> The object acts as a {@link Flow.Subscriber}<{@link List}<{@link |
731 * ByteBuffer}>> to the HTTP client implementation, which publishes |
829 * ByteBuffer}>> to the HTTP client implementation, which publishes |
1079 if (bufferSize <= 0) |
1177 if (bufferSize <= 0) |
1080 throw new IllegalArgumentException("must be greater than 0"); |
1178 throw new IllegalArgumentException("must be greater than 0"); |
1081 return new BufferingSubscriber<T>(downstream, bufferSize); |
1179 return new BufferingSubscriber<T>(downstream, bufferSize); |
1082 } |
1180 } |
1083 } |
1181 } |
1084 |
|
1085 /** |
|
1086 * A response subscriber for a HTTP/2 multi response. |
|
1087 * {@Incubating} |
|
1088 * |
|
1089 * <p> A multi response comprises a main response, and zero or more additional |
|
1090 * responses. Each additional response is sent by the server in response to |
|
1091 * requests (PUSH_PROMISEs) that the server also generates. Additional responses are |
|
1092 * typically resources that the server expects the client will need which |
|
1093 * are related to the initial request. |
|
1094 * <p> |
|
1095 * Note. Instead of implementing this interface, applications should consider |
|
1096 * first using the mechanism (built on this interface) provided by |
|
1097 * {@link MultiSubscriber#asMap(java.util.function.Function, boolean) |
|
1098 * MultiSubscriber.asMap()} which is a slightly simplified, but also |
|
1099 * general purpose interface. |
|
1100 * <p> |
|
1101 * The server generated requests are also known as <i>push promises</i>. |
|
1102 * The server is permitted to send any number of these requests up to the |
|
1103 * point where the main response is fully received. Therefore, after |
|
1104 * completion of the main response, the final number of additional |
|
1105 * responses is known. Additional responses may be canceled, but given that |
|
1106 * the server does not wait for any acknowledgment before sending the |
|
1107 * response, this must be done quickly to avoid unnecessary data transmission. |
|
1108 * |
|
1109 * <p> {@code MultiSubscriber}s are parameterized with a type {@code U} which |
|
1110 * represents some meaningful aggregate of the responses received. This |
|
1111 * would typically be a collection of response or response body objects. |
|
1112 * |
|
1113 * @param <U> a type representing the aggregated results |
|
1114 * @param <T> a type representing all of the response bodies |
|
1115 * |
|
1116 * @since 9 |
|
1117 */ |
|
1118 public interface MultiSubscriber<U,T> { |
|
1119 /** |
|
1120 * Called for the main request from the user. This {@link HttpRequest} |
|
1121 * parameter is the request that was supplied to {@link |
|
1122 * HttpClient#sendAsync(HttpRequest, MultiSubscriber)}. The |
|
1123 * implementation must return an {@link BodyHandler} for the response |
|
1124 * body. |
|
1125 * |
|
1126 * @param request the request |
|
1127 * |
|
1128 * @return an optional body handler |
|
1129 */ |
|
1130 BodyHandler<T> onRequest(HttpRequest request); |
|
1131 |
|
1132 /** |
|
1133 * Called for each push promise that is received. The {@link HttpRequest} |
|
1134 * parameter represents the PUSH_PROMISE. The implementation must return |
|
1135 * an {@code Optional} of {@link BodyHandler} for the response body. |
|
1136 * Different handlers (of the same type) can be returned for different |
|
1137 * pushes within the same multi send. If no handler (an empty {@code |
|
1138 * Optional}) is returned, then the push will be canceled. If required, |
|
1139 * the {@code CompletableFuture<Void>} supplied to the {@code |
|
1140 * onFinalPushPromise} parameter of {@link |
|
1141 * #completion(CompletableFuture, CompletableFuture)} can be used to |
|
1142 * determine when the final PUSH_PROMISE is received. |
|
1143 * |
|
1144 * @param pushPromise the push promise |
|
1145 * |
|
1146 * @return an optional body handler |
|
1147 */ |
|
1148 Optional<BodyHandler<T>> onPushPromise(HttpRequest pushPromise); |
|
1149 |
|
1150 /** |
|
1151 * Called for each response received. For each request either one of |
|
1152 * onResponse() or onError() is guaranteed to be called, but not both. |
|
1153 * |
|
1154 * <p> Note: The reason for switching to this callback interface rather |
|
1155 * than using CompletableFutures supplied to onRequest() is that there |
|
1156 * is a subtle interaction between those CFs and the CF returned from |
|
1157 * completion() (or when onComplete() was called formerly). The completion() |
|
1158 * CF will not complete until after all of the work done by the onResponse() |
|
1159 * calls is done. Whereas if you just create CF's dependent on a supplied |
|
1160 * CF (to onRequest()) then the implementation has no visibility of the |
|
1161 * dependent CFs and can't guarantee to call onComplete() (or complete |
|
1162 * the completion() CF) after the dependent CFs complete. |
|
1163 * |
|
1164 * @param response the response received |
|
1165 */ |
|
1166 void onResponse(HttpResponse<T> response); |
|
1167 |
|
1168 /** |
|
1169 * Called if an error occurs receiving a response. For each request |
|
1170 * either one of onResponse() or onError() is guaranteed to be called, |
|
1171 * but not both. |
|
1172 * |
|
1173 * @param request the main request or subsequent push promise |
|
1174 * @param t the Throwable that caused the error |
|
1175 */ |
|
1176 void onError(HttpRequest request, Throwable t); |
|
1177 |
|
1178 /** |
|
1179 * Returns a {@link java.util.concurrent.CompletableFuture}{@code <U>} |
|
1180 * which completes when the aggregate result object itself is available. |
|
1181 * It is expected that the returned {@code CompletableFuture} will depend |
|
1182 * on one of the given {@code CompletableFuture<Void}s which themselves |
|
1183 * complete after all individual responses associated with the multi |
|
1184 * response have completed, or after all push promises have been received. |
|
1185 * This method is called after {@link #onRequest(HttpRequest)} but |
|
1186 * before any other methods. |
|
1187 * |
|
1188 * @implNote Implementations might follow the pattern shown below |
|
1189 * <pre> |
|
1190 * {@code |
|
1191 * CompletableFuture<U> completion( |
|
1192 * CompletableFuture<Void> onComplete, |
|
1193 * CompletableFuture<Void> onFinalPushPromise) |
|
1194 * { |
|
1195 * return onComplete.thenApply((v) -> { |
|
1196 * U u = ... instantiate and populate a U instance |
|
1197 * return u; |
|
1198 * }); |
|
1199 * } |
|
1200 * } |
|
1201 * </pre> |
|
1202 * |
|
1203 * @param onComplete a CompletableFuture which completes after all |
|
1204 * responses have been received relating to this multi request. |
|
1205 * |
|
1206 * @param onFinalPushPromise CompletableFuture which completes after all |
|
1207 * push promises have been received. |
|
1208 * |
|
1209 * @return the aggregate CF response object |
|
1210 */ |
|
1211 CompletableFuture<U> completion(CompletableFuture<Void> onComplete, |
|
1212 CompletableFuture<Void> onFinalPushPromise); |
|
1213 |
|
1214 /** |
|
1215 * Returns a general purpose handler for multi responses. The aggregated |
|
1216 * result object produced by this handler is a |
|
1217 * {@code Map<HttpRequest,CompletableFuture<HttpResponse<V>>>}. Each |
|
1218 * request (both the original user generated request and each server |
|
1219 * generated push promise) is returned as a key of the map. The value |
|
1220 * corresponding to each key is a |
|
1221 * {@code CompletableFuture<HttpResponse<V>>}. |
|
1222 * |
|
1223 * <p> There are two ways to use these handlers, depending on the value |
|
1224 * of the <i>completion</I> parameter. If completion is true, then the |
|
1225 * aggregated result will be available after all responses have |
|
1226 * themselves completed. If <i>completion</i> is false, then the |
|
1227 * aggregated result will be available immediately after the last push |
|
1228 * promise was received. In the former case, this implies that all the |
|
1229 * CompletableFutures in the map values will have completed. In the |
|
1230 * latter case, they may or may not have completed yet. |
|
1231 * |
|
1232 * <p> The simplest way to use these handlers is to set completion to |
|
1233 * {@code true}, and then all (results) values in the Map will be |
|
1234 * accessible without blocking. |
|
1235 * <p> |
|
1236 * See {@link #asMap(java.util.function.Function, boolean)} |
|
1237 * for a code sample of using this interface. |
|
1238 * |
|
1239 * <p> See {@link #asMap(Function, boolean)} for a code sample of using |
|
1240 * this interface. |
|
1241 * |
|
1242 * @param <V> the body type used for all responses |
|
1243 * @param reqHandler a function invoked for the user's request and each |
|
1244 * push promise |
|
1245 * @param completion {@code true} if the aggregate CompletableFuture |
|
1246 * completes after all responses have been received, |
|
1247 * or {@code false} after all push promises received |
|
1248 * |
|
1249 * @return a MultiSubscriber |
|
1250 */ |
|
1251 public static <V> MultiSubscriber<MultiMapResult<V>,V> asMap( |
|
1252 Function<HttpRequest, Optional<HttpResponse.BodyHandler<V>>> reqHandler, |
|
1253 boolean completion) { |
|
1254 return new MultiSubscriberImpl<V>(reqHandler.andThen(optv -> optv.get()), |
|
1255 reqHandler, |
|
1256 completion); |
|
1257 } |
|
1258 |
|
1259 /** |
|
1260 * Returns a general purpose handler for multi responses. This is a |
|
1261 * convenience method which invokes {@link #asMap(Function,boolean) |
|
1262 * asMap(Function, true)} meaning that the aggregate result |
|
1263 * object completes after all responses have been received. |
|
1264 * |
|
1265 * <p><b>Example usage:</b> |
|
1266 * <br> |
|
1267 * <pre> |
|
1268 * {@code |
|
1269 * HttpRequest request = HttpRequest.newBuilder() |
|
1270 * .uri(URI.create("https://www.foo.com/")) |
|
1271 * .GET() |
|
1272 * .build(); |
|
1273 * |
|
1274 * HttpClient client = HttpClient.newHttpClient(); |
|
1275 * |
|
1276 * Map<HttpRequest,CompletableFuture<HttpResponse<String>>> results = client |
|
1277 * .sendAsync(request, MultiSubscriber.asMap( |
|
1278 * (req) -> Optional.of(HttpResponse.BodyHandler.asString()))) |
|
1279 * .join(); |
|
1280 * }</pre> |
|
1281 * |
|
1282 * <p> The lambda in this example is the simplest possible implementation, |
|
1283 * where neither the incoming requests are examined, nor the response |
|
1284 * headers, and every push that the server sends is accepted. When the |
|
1285 * join() call returns, all {@code HttpResponse}s and their associated |
|
1286 * body objects are available. |
|
1287 * |
|
1288 * @param <V> the body type used for all responses |
|
1289 * @param reqHandler a function invoked for each push promise and the |
|
1290 * main request |
|
1291 * @return a MultiSubscriber |
|
1292 */ |
|
1293 public static <V> MultiSubscriber<MultiMapResult<V>,V> asMap( |
|
1294 Function<HttpRequest, Optional<HttpResponse.BodyHandler<V>>> reqHandler) { |
|
1295 |
|
1296 return asMap(reqHandler, true); |
|
1297 } |
|
1298 |
|
1299 } |
|
1300 } |
1182 } |