1 /* |
1 /* |
2 * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. |
2 * Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved. |
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
4 * |
4 * |
5 * This code is free software; you can redistribute it and/or modify it |
5 * This code is free software; you can redistribute it and/or modify it |
6 * under the terms of the GNU General Public License version 2 only, as |
6 * under the terms of the GNU General Public License version 2 only, as |
7 * published by the Free Software Foundation. Oracle designates this |
7 * published by the Free Software Foundation. Oracle designates this |
23 * questions. |
23 * questions. |
24 */ |
24 */ |
25 |
25 |
26 package jdk.incubator.http; |
26 package jdk.incubator.http; |
27 |
27 |
|
28 import java.io.BufferedReader; |
28 import java.io.IOException; |
29 import java.io.IOException; |
29 import java.io.InputStream; |
30 import java.io.InputStream; |
|
31 import java.io.InputStreamReader; |
30 import java.lang.System.Logger.Level; |
32 import java.lang.System.Logger.Level; |
31 import java.nio.ByteBuffer; |
33 import java.nio.ByteBuffer; |
32 import java.nio.channels.FileChannel; |
34 import java.nio.channels.FileChannel; |
|
35 import java.nio.charset.Charset; |
|
36 import java.nio.charset.StandardCharsets; |
33 import java.nio.file.OpenOption; |
37 import java.nio.file.OpenOption; |
34 import java.nio.file.Path; |
38 import java.nio.file.Path; |
35 import java.security.AccessControlContext; |
39 import java.security.AccessControlContext; |
36 import java.security.AccessController; |
40 import java.security.AccessController; |
37 import java.security.PrivilegedActionException; |
41 import java.security.PrivilegedActionException; |
50 import java.util.concurrent.Flow.Subscriber; |
54 import java.util.concurrent.Flow.Subscriber; |
51 import java.util.concurrent.Flow.Subscription; |
55 import java.util.concurrent.Flow.Subscription; |
52 import java.util.concurrent.atomic.AtomicBoolean; |
56 import java.util.concurrent.atomic.AtomicBoolean; |
53 import java.util.function.Consumer; |
57 import java.util.function.Consumer; |
54 import java.util.function.Function; |
58 import java.util.function.Function; |
|
59 import java.util.stream.Stream; |
55 import jdk.incubator.http.internal.common.MinimalFuture; |
60 import jdk.incubator.http.internal.common.MinimalFuture; |
56 import jdk.incubator.http.internal.common.Utils; |
61 import jdk.incubator.http.internal.common.Utils; |
57 |
62 |
58 class ResponseSubscribers { |
63 class ResponseSubscribers { |
59 |
64 |
459 } |
464 } |
460 } |
465 } |
461 |
466 |
462 } |
467 } |
463 |
468 |
|
469 /** |
|
470 * A {@code Stream<String>} built on top of the Flow API. |
|
471 */ |
|
472 static final class HttpLineStream implements HttpResponse.BodySubscriber<Stream<String>> { |
|
473 |
|
474 private final HttpResponseInputStream responseInputStream; |
|
475 private final Charset charset; |
|
476 private HttpLineStream(Charset charset) { |
|
477 this.charset = Objects.requireNonNull(charset); |
|
478 responseInputStream = new HttpResponseInputStream(); |
|
479 } |
|
480 |
|
481 @Override |
|
482 public CompletionStage<Stream<String>> getBody() { |
|
483 return responseInputStream.getBody().thenApply((is) -> |
|
484 new BufferedReader(new InputStreamReader(is, charset)) |
|
485 .lines().onClose(this::close)); |
|
486 } |
|
487 |
|
488 @Override |
|
489 public void onSubscribe(Subscription subscription) { |
|
490 responseInputStream.onSubscribe(subscription); |
|
491 } |
|
492 |
|
493 @Override |
|
494 public void onNext(List<ByteBuffer> item) { |
|
495 responseInputStream.onNext(item); |
|
496 } |
|
497 |
|
498 @Override |
|
499 public void onError(Throwable throwable) { |
|
500 responseInputStream.onError(throwable); |
|
501 } |
|
502 |
|
503 @Override |
|
504 public void onComplete() { |
|
505 responseInputStream.onComplete(); |
|
506 } |
|
507 |
|
508 void close() { |
|
509 try { |
|
510 responseInputStream.close(); |
|
511 } catch (IOException x) { |
|
512 // ignore |
|
513 } |
|
514 } |
|
515 |
|
516 static HttpLineStream create(Charset charset) { |
|
517 return new HttpLineStream(Optional.ofNullable(charset).orElse(StandardCharsets.UTF_8)); |
|
518 } |
|
519 } |
|
520 |
464 static class MultiSubscriberImpl<V> |
521 static class MultiSubscriberImpl<V> |
465 implements HttpResponse.MultiSubscriber<MultiMapResult<V>,V> |
522 implements HttpResponse.MultiSubscriber<MultiMapResult<V>,V> |
466 { |
523 { |
467 private final MultiMapResult<V> results; |
524 private final MultiMapResult<V> results; |
468 private final Function<HttpRequest,Optional<HttpResponse.BodyHandler<V>>> pushHandler; |
525 private final Function<HttpRequest,Optional<HttpResponse.BodyHandler<V>>> pushHandler; |