src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java
equal
deleted
inserted
replaced
360 @Override |
360 @Override |
361 public void onSubscribe(Flow.Subscription s) { |
361 public void onSubscribe(Flow.Subscription s) { |
362 if (!subscribed.compareAndSet(false, true)) { |
362 if (!subscribed.compareAndSet(false, true)) { |
363 s.cancel(); |
363 s.cancel(); |
364 } else { |
364 } else { |
365 this.subscription = s; |
365 // check whether the stream is already closed. |
|
366 // if so, we should cancel the subscription |
|
367 // immediately. |
|
368 boolean closed; |
|
369 synchronized(this) { |
|
370 closed = this.closed; |
|
371 if (!closed) { |
|
372 this.subscription = s; |
|
373 } |
|
374 } |
|
375 if (closed) { |
|
376 s.cancel(); |
|
377 return; |
|
378 } |
366 assert buffers.remainingCapacity() > 1; // should contain at least 2 |
379 assert buffers.remainingCapacity() > 1; // should contain at least 2 |
367 DEBUG_LOGGER.log(Level.DEBUG, () -> "onSubscribe: requesting " |
380 DEBUG_LOGGER.log(Level.DEBUG, () -> "onSubscribe: requesting " |
368 + Math.max(1, buffers.remainingCapacity() - 1)); |
381 + Math.max(1, buffers.remainingCapacity() - 1)); |
369 s.request(Math.max(1, buffers.remainingCapacity() - 1)); |
382 s.request(Math.max(1, buffers.remainingCapacity() - 1)); |
370 } |
383 } |
409 onNext(LAST_LIST); |
422 onNext(LAST_LIST); |
410 } |
423 } |
411 |
424 |
412 @Override |
425 @Override |
413 public void close() throws IOException { |
426 public void close() throws IOException { |
|
427 Flow.Subscription s; |
414 synchronized (this) { |
428 synchronized (this) { |
415 if (closed) return; |
429 if (closed) return; |
416 closed = true; |
430 closed = true; |
417 } |
431 s = subscription; |
418 Flow.Subscription s = subscription; |
432 subscription = null; |
419 subscription = null; |
433 } |
|
434 // s will be null if already completed |
420 if (s != null) { |
435 if (s != null) { |
421 s.cancel(); |
436 s.cancel(); |
422 } |
437 } |
423 super.close(); |
438 super.close(); |
424 } |
439 } |