# HG changeset patch # User psandoz # Date 1386233093 -3600 # Node ID 48b31d370bc96ca9d039ed7fae34c2321089ca49 # Parent 393509a81cc34e24e558a1f314815f10c797ce6e 8028564: Concurrent calls to CHM.put can fail to add the key/value to the map Reviewed-by: psandoz, chegar, alanb Contributed-by: Doug Lea diff -r 393509a81cc3 -r 48b31d370bc9 jdk/src/share/classes/java/util/concurrent/ConcurrentHashMap.java --- a/jdk/src/share/classes/java/util/concurrent/ConcurrentHashMap.java Thu Dec 05 09:25:31 2013 +0100 +++ b/jdk/src/share/classes/java/util/concurrent/ConcurrentHashMap.java Thu Dec 05 09:44:53 2013 +0100 @@ -49,7 +49,6 @@ import java.util.Iterator; import java.util.Map; import java.util.NoSuchElementException; -import java.util.Objects; import java.util.Set; import java.util.Spliterator; import java.util.concurrent.ConcurrentMap; @@ -381,19 +380,21 @@ * progress. Resizing proceeds by transferring bins, one by one, * from the table to the next table. However, threads claim small * blocks of indices to transfer (via field transferIndex) before - * doing so, reducing contention. Because we are using - * power-of-two expansion, the elements from each bin must either - * stay at same index, or move with a power of two offset. We - * eliminate unnecessary node creation by catching cases where old - * nodes can be reused because their next fields won't change. On - * average, only about one-sixth of them need cloning when a table - * doubles. The nodes they replace will be garbage collectable as - * soon as they are no longer referenced by any reader thread that - * may be in the midst of concurrently traversing table. Upon - * transfer, the old table bin contains only a special forwarding - * node (with hash field "MOVED") that contains the next table as - * its key. On encountering a forwarding node, access and update - * operations restart, using the new table. + * doing so, reducing contention. A generation stamp in field + * sizeCtl ensures that resizings do not overlap. Because we are + * using power-of-two expansion, the elements from each bin must + * either stay at same index, or move with a power of two + * offset. We eliminate unnecessary node creation by catching + * cases where old nodes can be reused because their next fields + * won't change. On average, only about one-sixth of them need + * cloning when a table doubles. The nodes they replace will be + * garbage collectable as soon as they are no longer referenced by + * any reader thread that may be in the midst of concurrently + * traversing table. Upon transfer, the old table bin contains + * only a special forwarding node (with hash field "MOVED") that + * contains the next table as its key. On encountering a + * forwarding node, access and update operations restart, using + * the new table. * * Each bin transfer requires its bin lock, which can stall * waiting for locks while resizing. However, because other @@ -570,6 +571,23 @@ */ private static final int MIN_TRANSFER_STRIDE = 16; + /** + * The number of bits used for generation stamp in sizeCtl. + * Must be at least 6 for 32bit arrays. + */ + private static int RESIZE_STAMP_BITS = 16; + + /** + * The maximum number of threads that can help resize. + * Must fit in 32 - RESIZE_STAMP_BITS bits. + */ + private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; + + /** + * The bit shift for recording size stamp in sizeCtl. + */ + private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; + /* * Encodings for Node hash fields. See above for explanation. */ @@ -727,7 +745,7 @@ * errors by users, these checks must operate on local variables, * which accounts for some odd-looking inline assignments below. * Note that calls to setTabAt always occur within locked regions, - * and so in principle require only release ordering, not need + * and so in principle require only release ordering, not * full volatile semantics, but are currently coded as volatile * writes to be conservative. */ @@ -2192,6 +2210,14 @@ /* ---------------- Table Initialization and Resizing -------------- */ /** + * Returns the stamp bits for resizing a table of size n. + * Must be negative when shifted left by RESIZE_STAMP_SHIFT. + */ + static final int resizeStamp(int n) { + return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); + } + + /** * Initializes table, using the size recorded in sizeCtl. */ private final Node[] initTable() { @@ -2245,17 +2271,20 @@ s = sumCount(); } if (check >= 0) { - Node[] tab, nt; int sc; + Node[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && - tab.length < MAXIMUM_CAPACITY) { + (n = tab.length) < MAXIMUM_CAPACITY) { + int rs = resizeStamp(n); if (sc < 0) { - if (sc == -1 || transferIndex <= 0 || - (nt = nextTable) == null) + if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || + sc == rs + MAX_RESIZERS || (nt = nextTable) == null || + transferIndex <= 0) break; - if (U.compareAndSwapInt(this, SIZECTL, sc, sc - 1)) + if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } - else if (U.compareAndSwapInt(this, SIZECTL, sc, -2)) + else if (U.compareAndSwapInt(this, SIZECTL, sc, + (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } @@ -2267,11 +2296,15 @@ */ final Node[] helpTransfer(Node[] tab, Node f) { Node[] nextTab; int sc; - if ((f instanceof ForwardingNode) && + if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode)f).nextTable) != null) { - while (transferIndex > 0 && nextTab == nextTable && - (sc = sizeCtl) < -1) { - if (U.compareAndSwapInt(this, SIZECTL, sc, sc - 1)) { + int rs = resizeStamp(tab.length); + while (nextTab == nextTable && table == tab && + (sc = sizeCtl) < 0) { + if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || + sc == rs + MAX_RESIZERS || transferIndex <= 0) + break; + if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } @@ -2309,9 +2342,21 @@ } else if (c <= sc || n >= MAXIMUM_CAPACITY) break; - else if (tab == table && - U.compareAndSwapInt(this, SIZECTL, sc, -2)) - transfer(tab, null); + else if (tab == table) { + int rs = resizeStamp(n); + if (sc < 0) { + Node[] nt; + if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || + sc == rs + MAX_RESIZERS || (nt = nextTable) == null || + transferIndex <= 0) + break; + if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) + transfer(tab, nt); + } + else if (U.compareAndSwapInt(this, SIZECTL, sc, + (rs << RESIZE_STAMP_SHIFT) + 2)) + transfer(tab, null); + } } } @@ -2366,8 +2411,8 @@ sizeCtl = (n << 1) - (n >>> 1); return; } - if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, ++sc)) { - if (sc != -1) + if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { + if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit @@ -2566,11 +2611,8 @@ private final void treeifyBin(Node[] tab, int index) { Node b; int n, sc; if (tab != null) { - if ((n = tab.length) < MIN_TREEIFY_CAPACITY) { - if (tab == table && (sc = sizeCtl) >= 0 && - U.compareAndSwapInt(this, SIZECTL, sc, -2)) - transfer(tab, null); - } + if ((n = tab.length) < MIN_TREEIFY_CAPACITY) + tryPresize(n << 1); else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { synchronized (b) { if (tabAt(tab, index) == b) { @@ -2768,7 +2810,7 @@ private final void contendedLock() { boolean waiting = false; for (int s;;) { - if (((s = lockState) & WRITER) == 0) { + if (((s = lockState) & ~WAITER) == 0) { if (U.compareAndSwapInt(this, LOCKSTATE, s, WRITER)) { if (waiting) waiter = null; @@ -2793,12 +2835,13 @@ */ final Node find(int h, Object k) { if (k != null) { - for (Node e = first; e != null; e = e.next) { + for (Node e = first; e != null; ) { int s; K ek; if (((s = lockState) & (WAITER|WRITER)) != 0) { if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; + e = e.next; } else if (U.compareAndSwapInt(this, LOCKSTATE, s, s + READER)) { @@ -4454,7 +4497,7 @@ } public final boolean removeAll(Collection c) { - Objects.requireNonNull(c); + if (c == null) throw new NullPointerException(); boolean modified = false; for (Iterator it = iterator(); it.hasNext();) { if (c.contains(it.next())) { @@ -4466,7 +4509,7 @@ } public final boolean retainAll(Collection c) { - Objects.requireNonNull(c); + if (c == null) throw new NullPointerException(); boolean modified = false; for (Iterator it = iterator(); it.hasNext();) { if (!c.contains(it.next())) { diff -r 393509a81cc3 -r 48b31d370bc9 jdk/test/java/util/concurrent/ConcurrentHashMap/ConcurrentAssociateTest.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/jdk/test/java/util/concurrent/ConcurrentHashMap/ConcurrentAssociateTest.java Thu Dec 05 09:44:53 2013 +0100 @@ -0,0 +1,145 @@ +/* + * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +import org.testng.annotations.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.BiConsumer; +import java.util.function.Supplier; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +/** + * @test + * @bug 8028564 + * @run testng ConcurrentAssociateTest + * @summary Test that association operations, such as put and compute, + * place entries in the map + */ +@Test +public class ConcurrentAssociateTest { + + // The number of entries for each thread to place in a map + private static final int N = Integer.getInteger("n", 128); + // The number of iterations of the test + private static final int I = Integer.getInteger("i", 256); + + // Object to be placed in the concurrent map + static class X { + // Limit the hash code to trigger collisions + int hc = ThreadLocalRandom.current().nextInt(1, 9); + + public int hashCode() { return hc; } + } + + @Test + public void testPut() { + test("CHM.put", (m, o) -> m.put(o, o)); + } + + @Test + public void testCompute() { + test("CHM.compute", (m, o) -> m.compute(o, (k, v) -> o)); + } + + @Test + public void testComputeIfAbsent() { + test("CHM.computeIfAbsent", (m, o) -> m.computeIfAbsent(o, (k) -> o)); + } + + @Test + public void testMerge() { + test("CHM.merge", (m, o) -> m.merge(o, o, (v1, v2) -> v1)); + } + + @Test + public void testPutAll() { + test("CHM.putAll", (m, o) -> { + Map hm = new HashMap<>(); + hm.put(o, o); + m.putAll(hm); + }); + } + + private static void test(String desc, BiConsumer, Object> associator) { + for (int i = 0; i < I; i++) { + testOnce(desc, associator); + } + } + + static class AssociationFailure extends RuntimeException { + AssociationFailure(String message) { + super(message); + } + } + + private static void testOnce(String desc, BiConsumer, Object> associator) { + ConcurrentHashMap m = new ConcurrentHashMap<>(); + CountDownLatch s = new CountDownLatch(1); + + Supplier sr = () -> () -> { + try { + s.await(); + } + catch (InterruptedException e) { + } + + for (int i = 0; i < N; i++) { + Object o = new X(); + associator.accept(m, o); + if (!m.containsKey(o)) { + throw new AssociationFailure(desc + " failed: entry does not exist"); + } + } + }; + + int ps = Runtime.getRuntime().availableProcessors(); + Stream runners = IntStream.range(0, ps) + .mapToObj(i -> sr.get()) + .map(CompletableFuture::runAsync); + + CompletableFuture all = CompletableFuture.allOf( + runners.toArray(CompletableFuture[]::new)); + + // Trigger the runners to start associating + s.countDown(); + try { + all.join(); + } catch (CompletionException e) { + Throwable t = e.getCause(); + if (t instanceof AssociationFailure) { + throw (AssociationFailure) t; + } + else { + throw e; + } + } + } +} diff -r 393509a81cc3 -r 48b31d370bc9 jdk/test/java/util/concurrent/ConcurrentHashMap/ConcurrentContainsKeyTest.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/jdk/test/java/util/concurrent/ConcurrentHashMap/ConcurrentContainsKeyTest.java Thu Dec 05 09:44:53 2013 +0100 @@ -0,0 +1,137 @@ +/* + * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +import org.testng.annotations.Test; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.function.Supplier; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +/** + * @test + * @bug 8028564 + * @run testng ConcurrentContainsKeyTest + * @summary Test that entries are always present in the map, + * when entries are held within one bin that is a tree + */ +@Test +public class ConcurrentContainsKeyTest { + + // The number of entries for each thread to place in a map + // Should be > ConcurrentHashMap.TREEIFY_THRESHOLD but small + // enough to allow for enough iteration overlap by multiple threads + private static final int N = Integer.getInteger("n", 16); + // The number of rounds each thread performs per entry + private static final int R = Integer.getInteger("r", 32); + // The number of iterations of the test + private static final int I = Integer.getInteger("i", 256); + + // Object to be placed in the concurrent map + static class X implements Comparable { + + private final int a; + + X(int a) { + this.a = a; + } + + public int compareTo(X o) { + return this.a - o.a; + } + + public int hashCode() { + // Return the same hash code to guarantee collisions + return 0; + } + } + + @Test + public void testContainsKey() { + X[] content = IntStream.range(0, N).mapToObj(i -> new X(i)).toArray(X[]::new); + // Create map with an initial size >= ConcurrentHashMap.TREEIFY_THRESHOLD + // ensuring tree'ification will occur for a small number of entries + // with the same hash code + ConcurrentHashMap m = new ConcurrentHashMap<>(64); + Stream.of(content).forEach(x -> m.put(x, x)); + test(content, m); + } + + + private static void test(X[] content, ConcurrentHashMap m) { + for (int i = 0; i < I; i++) { + testOnce(content, m); + } + } + + static class AssociationFailure extends RuntimeException { + AssociationFailure(String message) { + super(message); + } + } + + private static void testOnce(Object[] content, ConcurrentHashMap m) { + CountDownLatch s = new CountDownLatch(1); + + Supplier sr = () -> () -> { + try { + s.await(); + } + catch (InterruptedException e) { + } + + for (int i = 0; i < R * N; i++) { + Object o = content[i % content.length]; + if (!m.containsKey(o)) { + throw new AssociationFailure("CHM.containsKey failed: entry does not exist"); + } + } + }; + + int ps = Runtime.getRuntime().availableProcessors(); + Stream runners = IntStream.range(0, ps) + .mapToObj(i -> sr.get()) + .map(CompletableFuture::runAsync); + + CompletableFuture all = CompletableFuture.allOf( + runners.toArray(CompletableFuture[]::new)); + + // Trigger the runners to start checking key membership + s.countDown(); + try { + all.join(); + } + catch (CompletionException e) { + Throwable t = e.getCause(); + if (t instanceof AssociationFailure) { + throw (AssociationFailure) t; + } + else { + throw e; + } + } + } +}