jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java
changeset 44639 5c2838d882a5
parent 43999 4cc44dd9f14f
child 45531 fb3dbffad37b
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java	Tue Apr 11 10:12:27 2017 +0800
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java	Tue Apr 11 16:32:38 2017 +0100
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -670,11 +670,21 @@
 
     @Override
     CompletableFuture<Response> getResponseAsync(Executor executor) {
-        CompletableFuture<Response> cf;
+        CompletableFuture<Response> cf = null;
+        // The code below deals with race condition that can be caused when
+        // completeResponse() is being called before getResponseAsync()
         synchronized (response_cfs) {
             if (!response_cfs.isEmpty()) {
+                // This CompletableFuture was created by completeResponse().
+                // it will be already completed.
                 cf = response_cfs.remove(0);
+                // if we find a cf here it should be already completed.
+                // finding a non completed cf should not happen. just assert it.
+                assert cf.isDone() : "Removing uncompleted response: could cause code to hang!";
             } else {
+                // getResponseAsync() is called first. Create a CompletableFuture
+                // that will be completed by completeResponse() when
+                // completeResponse() is called.
                 cf = new MinimalFuture<>();
                 response_cfs.add(cf);
             }
@@ -708,7 +718,7 @@
                     cf.complete(resp);
                     response_cfs.remove(cf);
                     return;
-                }
+                } // else we found the previous response: just leave it alone.
             }
             cf = MinimalFuture.completedFuture(resp);
             Log.logTrace("Created completed future (streamid={0}): {1}",
@@ -742,10 +752,13 @@
      */
     void completeResponseExceptionally(Throwable t) {
         synchronized (response_cfs) {
-            for (CompletableFuture<Response> cf : response_cfs) {
+            // use index to avoid ConcurrentModificationException
+            // caused by removing the CF from within the loop.
+            for (int i = 0; i < response_cfs.size(); i++) {
+                CompletableFuture<Response> cf = response_cfs.get(i);
                 if (!cf.isDone()) {
                     cf.completeExceptionally(t);
-                    response_cfs.remove(cf);
+                    response_cfs.remove(i);
                     return;
                 }
             }