src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java
branchhttp-client-branch
changeset 55858 cd5eeec735fb
parent 55847 3bac3bca4adb
child 55942 8d4770c22b63
equal deleted inserted replaced
55857:89c904d57ebe 55858:cd5eeec735fb
   360         @Override
   360         @Override
   361         public void onSubscribe(Flow.Subscription s) {
   361         public void onSubscribe(Flow.Subscription s) {
   362             if (!subscribed.compareAndSet(false, true)) {
   362             if (!subscribed.compareAndSet(false, true)) {
   363                 s.cancel();
   363                 s.cancel();
   364             } else {
   364             } else {
   365                 this.subscription = s;
   365                 // check whether the stream is already closed.
       
   366                 // if so, we should cancel the subscription
       
   367                 // immediately.
       
   368                 boolean closed;
       
   369                 synchronized(this) {
       
   370                     closed = this.closed;
       
   371                     if (!closed) {
       
   372                         this.subscription = s;
       
   373                     }
       
   374                 }
       
   375                 if (closed) {
       
   376                     s.cancel();
       
   377                     return;
       
   378                 }
   366                 assert buffers.remainingCapacity() > 1; // should contain at least 2
   379                 assert buffers.remainingCapacity() > 1; // should contain at least 2
   367                 DEBUG_LOGGER.log(Level.DEBUG, () -> "onSubscribe: requesting "
   380                 DEBUG_LOGGER.log(Level.DEBUG, () -> "onSubscribe: requesting "
   368                         + Math.max(1, buffers.remainingCapacity() - 1));
   381                         + Math.max(1, buffers.remainingCapacity() - 1));
   369                 s.request(Math.max(1, buffers.remainingCapacity() - 1));
   382                 s.request(Math.max(1, buffers.remainingCapacity() - 1));
   370             }
   383             }
   409             onNext(LAST_LIST);
   422             onNext(LAST_LIST);
   410         }
   423         }
   411 
   424 
   412         @Override
   425         @Override
   413         public void close() throws IOException {
   426         public void close() throws IOException {
       
   427             Flow.Subscription s;
   414             synchronized (this) {
   428             synchronized (this) {
   415                 if (closed) return;
   429                 if (closed) return;
   416                 closed = true;
   430                 closed = true;
   417             }
   431                 s = subscription;
   418             Flow.Subscription s = subscription;
   432                 subscription = null;
   419             subscription = null;
   433             }
       
   434             // s will be null if already completed
   420             if (s != null) {
   435             if (s != null) {
   421                  s.cancel();
   436                  s.cancel();
   422             }
   437             }
   423             super.close();
   438             super.close();
   424         }
   439         }