src/java.net.http/share/classes/jdk/internal/net/http/ResponseSubscribers.java
branchdatagramsocketimpl-branch
changeset 58678 9cf78a70fa4f
parent 53467 97cf88608d76
child 58679 9c3209ff7550
equal deleted inserted replaced
58677:13588c901957 58678:9cf78a70fa4f
   123             return result;
   123             return result;
   124         }
   124         }
   125 
   125 
   126         @Override
   126         @Override
   127         public void onSubscribe(Flow.Subscription subscription) {
   127         public void onSubscribe(Flow.Subscription subscription) {
       
   128             Objects.requireNonNull(subscription);
   128             if (!subscribed.compareAndSet(false, true)) {
   129             if (!subscribed.compareAndSet(false, true)) {
   129                 subscription.cancel();
   130                 subscription.cancel();
   130             } else {
   131             } else {
   131                 this.subscription = subscription;
   132                 this.subscription = subscription;
   132                 subscription.request(1);
   133                 subscription.request(1);
   133             }
   134             }
   134         }
   135         }
   135 
   136 
   136         @Override
   137         @Override
   137         public void onNext(List<ByteBuffer> items) {
   138         public void onNext(List<ByteBuffer> items) {
       
   139             Objects.requireNonNull(items);
   138             for (ByteBuffer item : items) {
   140             for (ByteBuffer item : items) {
   139                 byte[] buf = new byte[item.remaining()];
   141                 byte[] buf = new byte[item.remaining()];
   140                 item.get(buf);
   142                 item.get(buf);
   141                 consumer.accept(Optional.of(buf));
   143                 consumer.accept(Optional.of(buf));
   142             }
   144             }
   143             subscription.request(1);
   145             subscription.request(1);
   144         }
   146         }
   145 
   147 
   146         @Override
   148         @Override
   147         public void onError(Throwable throwable) {
   149         public void onError(Throwable throwable) {
       
   150             Objects.requireNonNull(throwable);
   148             result.completeExceptionally(throwable);
   151             result.completeExceptionally(throwable);
   149         }
   152         }
   150 
   153 
   151         @Override
   154         @Override
   152         public void onComplete() {
   155         public void onComplete() {
   170         private final Path file;
   173         private final Path file;
   171         private final OpenOption[] options;
   174         private final OpenOption[] options;
   172         private final FilePermission[] filePermissions;
   175         private final FilePermission[] filePermissions;
   173         private final CompletableFuture<Path> result = new MinimalFuture<>();
   176         private final CompletableFuture<Path> result = new MinimalFuture<>();
   174 
   177 
       
   178         private final AtomicBoolean subscribed = new AtomicBoolean();
   175         private volatile Flow.Subscription subscription;
   179         private volatile Flow.Subscription subscription;
   176         private volatile FileChannel out;
   180         private volatile FileChannel out;
   177 
   181 
   178         private static final String pathForSecurityCheck(Path path) {
   182         private static final String pathForSecurityCheck(Path path) {
   179             return path.toFile().getPath();
   183             return path.toFile().getPath();
   209                     filePermissions == null ? EMPTY_FILE_PERMISSIONS : filePermissions;
   213                     filePermissions == null ? EMPTY_FILE_PERMISSIONS : filePermissions;
   210         }
   214         }
   211 
   215 
   212         @Override
   216         @Override
   213         public void onSubscribe(Flow.Subscription subscription) {
   217         public void onSubscribe(Flow.Subscription subscription) {
       
   218             Objects.requireNonNull(subscription);
       
   219             if (!subscribed.compareAndSet(false, true)) {
       
   220                 subscription.cancel();
       
   221                 return;
       
   222             }
       
   223 
   214             this.subscription = subscription;
   224             this.subscription = subscription;
   215             if (System.getSecurityManager() == null) {
   225             if (System.getSecurityManager() == null) {
   216                 try {
   226                 try {
   217                     out = FileChannel.open(file, options);
   227                     out = FileChannel.open(file, options);
   218                 } catch (IOException ioe) {
   228                 } catch (IOException ioe) {
   426             return currentBuffer;
   436             return currentBuffer;
   427         }
   437         }
   428 
   438 
   429         @Override
   439         @Override
   430         public int read(byte[] bytes, int off, int len) throws IOException {
   440         public int read(byte[] bytes, int off, int len) throws IOException {
       
   441             Objects.checkFromIndexSize(off, len, bytes.length);
       
   442             if (len == 0) {
       
   443                 return 0;
       
   444             }
   431             // get the buffer to read from, possibly blocking if
   445             // get the buffer to read from, possibly blocking if
   432             // none is available
   446             // none is available
   433             ByteBuffer buffer;
   447             ByteBuffer buffer;
   434             if ((buffer = current()) == LAST_BUFFER) return -1;
   448             if ((buffer = current()) == LAST_BUFFER) return -1;
   435 
   449 
   468             return 1;
   482             return 1;
   469         }
   483         }
   470 
   484 
   471         @Override
   485         @Override
   472         public void onSubscribe(Flow.Subscription s) {
   486         public void onSubscribe(Flow.Subscription s) {
       
   487             Objects.requireNonNull(s);
   473             try {
   488             try {
   474                 if (!subscribed.compareAndSet(false, true)) {
   489                 if (!subscribed.compareAndSet(false, true)) {
   475                     s.cancel();
   490                     s.cancel();
   476                 } else {
   491                 } else {
   477                     // check whether the stream is already closed.
   492                     // check whether the stream is already closed.
   598             this.result = result;
   613             this.result = result;
   599         }
   614         }
   600 
   615 
   601         @Override
   616         @Override
   602         public void onSubscribe(Flow.Subscription subscription) {
   617         public void onSubscribe(Flow.Subscription subscription) {
       
   618             Objects.requireNonNull(subscription);
   603             if (!subscribed.compareAndSet(false, true)) {
   619             if (!subscribed.compareAndSet(false, true)) {
   604                 subscription.cancel();
   620                 subscription.cancel();
   605             } else {
   621             } else {
   606                 subscription.request(Long.MAX_VALUE);
   622                 subscription.request(Long.MAX_VALUE);
   607             }
   623             }
   612             Objects.requireNonNull(items);
   628             Objects.requireNonNull(items);
   613         }
   629         }
   614 
   630 
   615         @Override
   631         @Override
   616         public void onError(Throwable throwable) {
   632         public void onError(Throwable throwable) {
       
   633             Objects.requireNonNull(throwable);
   617             cf.completeExceptionally(throwable);
   634             cf.completeExceptionally(throwable);
   618         }
   635         }
   619 
   636 
   620         @Override
   637         @Override
   621         public void onComplete() {
   638         public void onComplete() {
   905                 subscriber.onError(new IllegalStateException(
   922                 subscriber.onError(new IllegalStateException(
   906                         "This publisher has already one subscriber"));
   923                         "This publisher has already one subscriber"));
   907             }
   924             }
   908         }
   925         }
   909 
   926 
       
   927         private final AtomicBoolean subscribed = new AtomicBoolean();
       
   928 
   910         @Override
   929         @Override
   911         public void onSubscribe(Flow.Subscription subscription) {
   930         public void onSubscribe(Flow.Subscription subscription) {
   912             subscriptionCF.complete(subscription);
   931             Objects.requireNonNull(subscription);
       
   932             if (!subscribed.compareAndSet(false, true)) {
       
   933                 subscription.cancel();
       
   934             } else {
       
   935                 subscriptionCF.complete(subscription);
       
   936             }
   913         }
   937         }
   914 
   938 
   915         @Override
   939         @Override
   916         public void onNext(List<ByteBuffer> item) {
   940         public void onNext(List<ByteBuffer> item) {
       
   941             Objects.requireNonNull(item);
   917             try {
   942             try {
   918                 // cannot be called before onSubscribe()
   943                 // cannot be called before onSubscribe()
   919                 assert subscriptionCF.isDone();
   944                 assert subscriptionCF.isDone();
   920                 SubscriberRef ref = subscriberRef.get();
   945                 SubscriberRef ref = subscriberRef.get();
   921                 // cannot be called before subscriber calls request(1)
   946                 // cannot be called before subscriber calls request(1)
   939                     "onError called before onSubscribe",
   964                     "onError called before onSubscribe",
   940                     throwable);
   965                     throwable);
   941             // onError can be called before request(1), and therefore can
   966             // onError can be called before request(1), and therefore can
   942             // be called before subscriberRef is set.
   967             // be called before subscriberRef is set.
   943             signalError(throwable);
   968             signalError(throwable);
       
   969             Objects.requireNonNull(throwable);
   944         }
   970         }
   945 
   971 
   946         @Override
   972         @Override
   947         public void onComplete() {
   973         public void onComplete() {
   948             // cannot be called before onSubscribe()
   974             // cannot be called before onSubscribe()