开发者

Java源码重读之ConcurrentHashMap详解

开发者 https://www.devze.com 2023-05-08 10:34 出处:网络 作者: U2647
目录0. 第一个属性 serialPersistentFields1. spread()2. taBAT()、casTabAt()、setTabAt()3. counterCells4. keySet、values、entrySet5. 构造方法6. putAll()7. tryPresize()8. resizeStamp()9.transfer()10.putVa
目录
  • 0. 第一个属性 serialPersistentFields
  • 1. spread()
  • 2. taBAT()、casTabAt()、setTabAt()
  • 3. counterCells
  • 4. keySet、values、entrySet
  • 5. 构造方法
  • 6. putAll()
  • 7. tryPresize()
  • 8. resizeStamp()
  • 9.transfer()
  • 10.putVal()
  • 11.initTable()
  • 12.helpTransfer()
  • 13.addCount()
  • 14.get()

如果你没有阅读过 Java 源码重读系列之 HashMap 这篇文章的话,建议你先读一下。以下所有内容的前提是你已阅读过以上的文章。

另外,凡是涉及到多线程、并发的东西从来就没有简单的,所以这次我们很难讲清楚 ConcurrentHashMap 中的所有内容,只能聚焦到以下几个内容

  • ConcurrentHashMap 的 get 操作
  • ConcurrentHashMap 的 put 操作
  • ConcurrentHashMap 的 resize() 操作
  • ConcurrentHashMap 是如何保证线程安全的

如果你想要了解的内容不在以上范围内,那就不用继续阅读了,以免浪费时间~

0. 第一个属性 serialPersistentFields

因为 ConcurrentHashMap 的逻辑比较复杂,所以我们直接从 serialPersistentFields 属性说起,它之前的这些属性等用到的时候我们再看就好了,你只要知道 这个属性之前还有一堆固定的属性就好了。

serialPersistentFields 属性是一个 ObjectStreamField 的数组,而且默认添加了三个元素。

    private static final ObjectStreamField[] serialPersistentFields = {
        new ObjectStreamField("segments", Segment[].class),
        new ObjectStreamField("segmentMask", Integer.TYPE),
        new ObjectStreamField("segmentShift", Integer.TYPE)
    };

我们点到 ObjectStreamField 类中去,它的类头有一段这样的描述:

 * A description of a Serializable field from a Serializable class.  An array

 * of ObjectStreamFields is used to declare the Serializable fields of a class.

简单翻译一下就是,一个序列化类中可以序列化属性的描述。ObjectStreamFields 数组声明了这个类的可序列化的字段。

好了,这个类我们看到这就可以了,而且也知道了 ConcurrentHashMap 中 serialPersistentFields 属性的作用。就是声明了一下 ConcurrentHashMap 里有三个 属性可以被序列化。这三个属性分别是segments、segmentMask、segmentShift 。结束~

1. spread()

继续往下是 Node 类的定义,没什么好说的,我们遇到了第一个方法。

    static final int spread(int h) {
        return (h ^ (h >>> 16)) & HASH_BITS;
    }

都是一些位运算。解释一下,这个方法会将 h 和 h 右移 16 位的数值进行异或(^)操作,得到的结果与 HASH_BITS 进行与(&)操作。和 HASH_BITS 进行与(&)操作,作用就是保证返回的结果的最高位一定是 0,也就是说,返回的结果一定是正数。(如果你对位运算没有什么概念的话,也可以不用纠结这个方法,这个方法的作用就是,给一个数,返回另外一个数。)

2. tabAt()、casTabAt()、setTabAt()

    @SuppressWarnings("unchecked")
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }
    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
        U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
    }

这几个是其实是 ConcurrentHashMap 的核心操作方法。tabAt() 是获取,casTabAt() 是更新,并且是基于 CAS 方式实现的更新。setTabAt() 是插入。这些实现都使用了大名鼎鼎的 sun.misc.Unsafe 类。

如果你对这个类不熟悉的话,其实可以简单理解,这个类里的一些方法都是线程的。因为这个类提供的是一些硬件级别的原子操作。简单来说,sun.misc.Unsafe 类提供的方法都是线程安全的。理解到这里就可以了,再深入的内容,就不再本文范围内了。继续往下。

3. counterCells

继续往下的话,就看到了 tablenextTable,没什么说的,这个就是存储数据的数组了,至于 nextTable,通过注释可以看到,这个变量应该是只在扩容时使用到了,等用到的时候再说。

继续往下呢就是一些int 类型的值了,通过名字和注释也看不出来什么,直接跳过。等用到的时候再说。继续往下的话我们就看到了一个 CounterCell[] 数组了,点到这个类的定义,可以看到以下代码。

    @sun.misc.Contended static final class CounterCell {
        volatile long value;
        CounterCell(long x) { value = x; }
    }

好像也没有多复杂,就一个使用了 volatile 标记的 数值。至于 sun.misc.Contended 注解,主要是解决 CPU 伪缓存 问题的,提高性能和效率使用的,可以先不用关注。

但是,如果你阅读一下注释的话,就会发现这里面大有文章。涉及到两个非常复杂的东西:LongAdder and Striped64。关于 LongAdder and Striped64 的内容也不在本文范围内,有兴趣的话可以搜一下相关的文章。不了解也没有关系,不影响阅读。我们继续往下看。

4. keySet、values、entrySet

再往下就是几个 view 变量了。

    // views
    private transient KeySetView<K,V> keySet;
    private transient ValuesView<K,V> values;
    private transient EntrySetView<K,V> entrySet;

看名字也应该能猜出来,这些变量应该是跟 HashMap 的 keySet()、values()、entrySet() 几个方法的作用类似。如果你点到它的定义就会看到,这几个类都继承了 CollectionView 这个类。

abstract static class CollectionView<K,V,E>
        implements Collection<E>, java.io.Serializable {
        private static final long serialVersionUID = 7249069246763182397L;
        final ConcurrentHashMap<K,V> map;
        CollectionView(ConcurrentHashMap<K,V> map)  { this.map = map; }
        //... ...

只看前面几行就可以了,内部有一个 ConcurrentHashMap 类型的变量,而且 CollectionView 只有一个带有 ConcurrentHashMap 参数的构造方法。盲猜也能猜到,上面的 xxxView 类内部操作的也都是 ConcurrentHashMap 存储的数据。了解这些就可以了,我们继续往下看。

5. 构造方法

第一个是个空构造方法没有什么好说的,先看第二个。

    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }

通过注释和名称我们应该能够知道,这个构造方法可以初始化 Map 的容量。有意思的是,计算 cap 的方法。不知道你还记不记得 HashMap 的初始容量的构造方法是怎么计算容量的。代码在下面

this.threshold = tableSizeFor(initialCapacity);

而 ConcurrentHashMap 则是将 initialCapacity 加上了 initialCapacity 的一半又加了 1 作为 tableSizeFor 的参数。其实就是为了解决 HashMap 存在的可能出现两次扩容的问题。

注意,这里使用的是 >>>,不是 >>>>> 的含义是无符号右移。它会把最高位表示正负的值也会右移,然后补 0。 所以 >>> 之后,一定是正数。如果 >>> 之前是正数的话,结果跟 >> 一致。如果是负数的话,就会出现一个很奇怪的正数。这是因为最高位表示负数的 1 也跟着右移了。由于代码里已经判断了小于 0 ,所以我们目前先按照除 2 理解即可。

还有一个点是,从代码来看,ConcurrentHashMap 的最大容量 好像 是用 sizeCtl 表示的。但是,如果仅仅是表示最大容量,为什么会定义一个这么奇怪的名字呢? Ctl 的后缀应该是 control 的简写。具体是怎么控制的呢?

继续往下,我们先跳过带有 Map 参数的构造方法,因为这个涉及到 putAll() 方法。

    public ConcurrentHashMap(int initialCapacity, float loadFactor) {
        this(initialCapacity, loadFactor, 1);
    }
    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (initialCapacity < concurrencyLevel)   // Use at least as many bins
            initialCapacity = concurrencyLevel;   // as estimated threads
        long size = (long)(1.0 + (long)initialCapacity / loadFactor);
        int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);
        this.sizeCtl = cap;
    }

这两个构造方法其实可以算做一个,我们直接看下面那个复杂的。

先判断了一下参数的取值,然后更新了一下 initialCapacity 参数,然后根据参数计算 size,考虑到 loadFactor 可能小于 1,导致 int 值越界,所以转成了 long 类型。

关于 concurrencyLevel,给的注释是:并发更新线程的预估数量。那上面那段判断更新就不难理解了。假如我预估会有 20 个线程同时更新这个初始容量为 15 的 Map,这个时候的初始容量会自动的改为 20 。

好像没有什么问题?有意思的是, loadFactor 这个参数竟然没有保存!! 加载因子没有保存,那什么时候触发扩容呢?我们继续往下看。

6. putAll()

回到带有 Map 参数的构造方法。

    public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
        this.sizeCtl = DEFAULT_CAPACITY;
        putAll(m);
    }

没有什么复杂的,指定了下默认的初始容量(16)就直接 putAll(m); 了。

    public void putAll(Map<? extends K, ? extends V> m) {
        tryPresize(m.size());
        for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
            putVal(e.getKey(), e.getValue(), false);
    }

好像也不难,先执行 tryPresize(m.size()); 应该是初始扩容, 然后再 for 循环进行 putVal() 操作。

7. tryPresize()

先看下方法名。尝试并行重置容量。里面的 P 应该是 parallel(并行) 的缩写。

    private final void tryPresize(int size) {
        int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
            tableSizeFor(size + (size >>> 1) + 1);
        int sc;
        while ((sc = sizeCtl) >= 0) {
            Node<K,V>[] tab = table; int n;
            if (tab == null || (n = tab.length) == 0) {
                n = (sc > c) ? sc : c;
                if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {
                        if (table == tab) {
                            @SuppressWarnings("unchecked")
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = nt;
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        sizeCtl = sc;
                    }
                }
            }
            else if (c <= sc || n >= MAXIMUM_CAPACITY)
                break;
            else if (tab == table) {
                int rs = resizeStamp(n);
                if (sc < 0) {
                    Node<K,V>[] nt;
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc ==   + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
            }
        }
    }

首先计算了下变量 c,这个是保存入参 size 个元素时需要的最大容量。

然后是一个 whlie 循环,因为我们是通过构造方法进来的,所以 sizeCtl 的值现在是默认值 16 ,table 现在是 null。这个时候就进入到了 if 的代码里了。

if 的条件里是判断了 compareAndSwapInt() 的结果。这里需要说一下,compareAndSwapInt 方法是 CAS 的一种实现。这个方法内部做了两件事情,首先是比较 this 这个对象的 SIZECTL 值是否跟 sc 相等,相等的话,把 SIZECTL 的值 改为 -1。而且 Unsafe 类还保证了线程的安全。如果有多个线程同时执行这个方法的话,只会有一个线程成功。不会出现两个线程都比较通过了,然后在赋值的时候产生覆盖的问题。

好像也不难理解,其实就是把 sizeCtl 值改成了 -1,而且只有一个线程会成功。这里的 sizeCtl 更像是一把锁,哪个线程改成了 -1 ,哪个线程就获取到了锁,那它就可以执行后面的流程了。

继续,因为上面已经对 tab = table 赋值了,所以下面的判断也能通过。然后,就看到了数组初始化的过程了。直接 new 了一个长度为 n 的 Node[]。并赋值给了 table。如果你往上追一下 n 的赋值,就会知道,现在的 n 正好是 c。就是方法一开始计算的值。

table 数组都已经初始化了,是不是结束了?并没有。这个时候更新了一下 sc。 >>> 2 相当于除 4,其实就是 sc 现在的值是 n 的 3/4 。而且在 finally 块中,更新了 sizeCtl。这个时候 sizeCtl 就不是 -1 了。根据我们之前的理解,这里更新 sizeCtl,应该是在释放锁。

然后,第一次 while 循环就结束了。再次进入 while 循环,这次 sc 是 n 的 3/4 了,上一次循环已经更新了 sizeCtl

这次 table 就不等于 null 了。而且根据我们之前的推断,现在的 sc 应该等于 n 的 3/4 ,而 n 之前又等于 c。所以, c <= sc 这个条件也不成立。

而且 n >= MAXIMUM_CAPACITY 这个条件大概率是在扩容到最大的时候才会成立。所以,就走到了最后一个条件分支里了。因为 sc 现在是大于 0 的,所以直接走到了最后一个分支。

PS: if (sc < 0) 这个分支好像永远不会执行,因为 while 进入的条件要求 sc >= 0,而在判断sc < 0 之前又没有任何地方会把 sc 更新为负数,所以好像一直都不会执行。如果我理解错了,希望有人能解惑一下~

8. resizeStamp()

首先执行了一下 resizeStamp() 方法。这个方法也是一个位运算的方法。你可以直接使用 main() 方法跑一下,会返回一个很难理解的正数。简单说一下这个数是怎么得出来的。

    static final int resizeStamp(int n) {
        return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
    }

首先, numberOfLeadingZeros() 会返回 n 的二进制串中从最左边算起连续的“0”的总个数。然后再跟 1 左移 15 位的值按位或(|)操作。 最终得到的就是一个在二进制中,第 16 位为 1 的正数。

继续回到代码,因为现在已经确定 sc 是 n 的 3/4 了(PS:如果这个 3/4 不理解,那换成 0.75 是不是会好点 ,好像跟 HashMap 的扩容因子一样,其实 sc 的值就是扩容阈值,这个后面会提到,现在不理解没关系),所以也是大于 0 的。这里又进行了一次 compareAndSwapInt()。这个时候赋值的是把 rs 左移了 16 位。 还记得 resizeStamp() 返回的结果么,第 16 位是 1。所以 rs 右移 16 位之后,最高位就是 1 了,在 int 类型里,二进制的最高位表示正负,1 表示负数。

所以,这个时候,又把 sizeCtl 更新成负值了。根据我们之前的理解,这里应该还是获取锁的操作。获取到锁之后,一般就是需要操作资源了。但是 table 我们不是已经初始化好了吗?这次又要初始什么呢?

记住,现在 sizeCtl 是一个负值,并且 sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2) 后面要用到!

9.transfer()

        int n = tab.length, stride;
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;
        }
        //... ...

到这里还是比较好理解的。先初始了一下 striden 这两个变量。然后,因为我们是初始化进来的,所以 nextTab 一定等于 null。这个时候会初始化 nextTab。在创建数组的时候捕获了一个异常,这个异常出现的唯一情况就是内存不够了,分配不了 2 倍的 n 的数组。这个时候,将 sizeCtl 的值改为了 Integer.MAX_VALUE。然后就结束了。如果没有抛异常,会更新 nextTabletransferIndex 的值。

我们需要回头看下 tryPresize() 方法。如果在抛异常的时候结束,会出现什么情况。根据代码,异常结束后,会进入第三次循环,这次循环会进入第二个分支。因为 c <= sc 一定会成立。这里就会结束循环。

到这里,我们已经把 tryPresize() 循环里的三个分支都走完了,下面继续看 transfer() 这个方法。

nextTab 初始化之后,我们又看到了一个新的 Node 类:

    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);

点到这个类的定义里我们会发现,这个类里面只有一个属性 nextTable 和一个 find() 方法。关于 find() 是在获取元素时才能用到,我们先不用关注。目前来看 ForwardingNode 其实就是对 nextTab 的一个封装。然后继续看 transfer()

两个boolean 类型的值,默认一个 true,一个 false。

下面的代码是一个 for 循环,但是这个循环有差不多 100 多行的代码(如果我在项目里遇到这种代码估计会骂人的~)。我们一点点看,首先是一个while 循环。

        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            } else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex, 
                    nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }

首先 i = 0, bound = 0 ,所以,第一次循环不会进入第一个分支。然后,根据之前 transferIndex = n; 的赋值,也不会进入第二个分支。

这样就来到了第三个分支。compareAndSwapInt 会更新 transferIndex 的值,如果 CPU 的个数是 1,transferIndex 更新成 0 ,否则更新成 nextIndex - stride 。然后更新 bound、i、advance 的值,循环就结束了。

继续往下,现在 i 的值是 n - 1,所以不会命中 if (i < 0 || i >= n || i + n >= nextn) 条件。

然后,因我们是初始化时进入的,素以 tab 里的所有元素都是 null,第二个条件就通过了。

    else if ((f = tabAt(tab, i)) == null)
        advance = casTabAt(tab, i, null, fwd);

其实就是把 tab 的 i 位置 初始化了一个 fwd 元素。 到这里,第一次 for 循环就结束了。

第二次循环其实也很简单,首先 advance = false ,不会进入 while 循环,然后就会进入下面的判断

    else if ((f = tabAt(tab, i)) == null)
        advance = casTabAt(tab, i, null, fwd);
    else if ((fh = f.hash) == MOVED)
        advance = true; // already processed

首先,获取了下 tab[i] 的值,因为上次循环已经赋值过了,现在 f = fwd。然后,有意思的来了,先看下 MOVED 的定义

    static final int MOVED     = -1; // hash for forwarding nodes

没错,MOVED = -1,按我们正常的理解,一个对象的 hash 值,怎么也不会等于 -1 吧,我们再回头看下 ForwardingNode 这个类

    static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }
        //... ...
    }
    static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;
        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }
        //... ... 
    }

顺便贴了下父类的代码,主要看构造函数,看到了么? fwd 这个对象在初始化的时候,指定了 hash 值,就是 MOVED。OK,回到之前的循环。

这次循环就会把 advance 改为 true。第二次循环就结束了。

经过上面的两次循环之后,我们是其实只是执行了一行代码 tab[n-1] = fwd。现在进入第三次循环,之前 i = n - 1 。现在又执行了一次 if (--i >= bound || finishing),所以现在 i = n - 2 。但是 bound 可能有两种情况,一种是 bound = n - stride,一种是 bound = 0。我们先假设 bound = n - stride; stride = 16 。所以,第一个条件是成立的,执行 advance = false; ,然后 while 循环结束。

然后,第一个 if (i < 0 || i >= n || i + n >= nextn) 条件不会成立,又执行到了赋值操作里。这时 tab[n-2] = fwd。第三次循环结束~

然后第四次循环又会把 advance 改为 true

好好回味下~~

其实,这一顿操作下来就是在执行 tab[i] = fwd 这一行代码。搞了这么多东西,其实就是在支持多线程并发扩容,简单说下过程。

首先,while 循环会确定当前线程扩容的区间 [ bound,nextIndex ) 左开右闭。然后 while 循环下面的代码其实就是在给 tabnextTab 赋值。设想下,如果 while 循环里的 compareAndSwapInt 执行失败,会是什么情况?没错,会空转!结束只有两种情况,一种是 transferIndex = 0。说明已经有其他线程把所有的区间都认领了,另外一种情况是执行 compareAndSwapInt。认领 [ bound,nextIndex ) 的区间,进行扩容。

其实,你可以直接验证下的,打断点也好,手写也罢,假设n = 1024; NCPU = 4。这时 stride = 32。那么第一个线程会先对 tab[1024-32,1024-1]进行赋值。如果这时有其他线程进来,在 while 循环的时候,就会认领 tab[1024-32-32,1024-32-1] 的区间进行赋值。如果有更多的线程进来的话,就会加快这个过程。这个就是所谓的 并发扩容 ,也有叫 辅助扩容 的。

然后,我们来看下通过 synchronized 加锁的这段代码。能执行到这里的话只能是 tab[i].hash != MOVED。那就说明这里记录的是一个正常的数据。

首先判断了下 if (tabAt(tab, i) == f) 没什么说的,肯定成立,不成立就结束了,然后判断了下 if (fh >= 0)。有点奇怪,正常数据的 hash 还能小于 0 ?那我们先看下不成立的情况

 else if (f instanceof TreeBin) 

明白了吧,当发生 hash 冲突时,并且链表已经转成红黑树了,这时 tab[i] = TreeBin,那我们看下 TreeBin 的定义。

static final class TreeBin<K,V> extends Node<K,V> {
    TreeBin(TreeNode<K,V> b) {
        super(TREEBIN, null, null, null);
        //... ...
    }
}
 static final int TREEBIN   = -2; // hash for roots of trees

真相了,TreeBin 的 hash 值是 -2,就小于 0。后面的代码我们就不说了,其实跟HashMap是一样的,如果当前节点是链表,那就采用高低位链表的形式对 nextTab 赋值,如果是 TreeBin 那就采用红黑树的方式进行赋值。而且,我们对 tab[i] 加了 synchronized 锁,也不会有线程竞争,老老实实赋值就好了。

最后,transfer 里的代码基本上都看完了,就剩下面这段了

    if (i < 0 || i >= n || i + n >= nextn) {
        int sc;
        if (finishing) {
            nextTable = null;
            table = nextTab;
            sizeCtl = (n << 1) - (n >>> 1);
            return;
        }
        if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
            if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                return;
            finishing = advance = true;
            i = n; // recheck before commit
        }
    }

while 循环里,有这么一行代码 i = -1; 执行了这个之后,就会进入上面的代码里。其实就是 tab 初始化完成之后,即 nextIndex = 0 的时候,就会执行 i = -1; ,然后就会进入上面的代码了。我们看下上面的代码。

首先,finishing 现在应该是等于 false 的,直接进入第二个 if。这个也很简单,首先 sc = sizeCtl,赋值,然后通过 CAS 将 sizeCtl 的值改为 sc - 1。还记得 sizeCtl 的值是多少么?? 我直接粘贴一下 sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2)。是不是跟 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) 这个判断逻辑一致?如果不相等,说明有其他线程修改了sizeCtl,这同时说明有其他线程还在执行扩容的动作,即还在执行 tryPresize() 或者是 transfer() 方法。那么,因为当前线程已经执行完了,所以直接 return; 结束,让其他线程继续执行就好了。

如果相等,执行 finishing = advance = true; i = n。进入下一次 for 循环。

一顿判断之后,你会发现,还是会进入到上面的代码,而这时,finishing == true 了!下面的代码就不难理解了吧,更新 table ,相当于使用了新的数组了,而 sizeCtl 也更新了一下。都是位运算,如果你看不明白,可以用 main 方法跑一下。我们假设 n = 1024,那么 table 现在的大小也就是之前 nextTab 的大小,就是2048,然后,我们用 main 跑一下 sizeCtl 的值,不出意外的话应该等于 1536 。如果还看不明白,那么你再计算下 1536 / 2048。结果是 0.75 ,这个数字熟悉吧? HashMap 的默认加载因子!。没错,sizeCtl 其实就是下次需要扩容的阈值。

到这里,我们就把 transfer() 方法看完了。然后,我们重点总结下 sizeCtl 这个属性,不得不承认,这个设计非常的巧妙!

首先,通常情况下 sizeCtl 应该是等于下次的扩容阈值的。但是在扩容期间,有两个状态,一个是 -1,一个是非常大的一个负值。等于 -1 很好理解,相当于是一个锁,哪个线程更新成功,就可以进行数组的初始化动作。那么,就剩最后一种情况了。直接用下面的 main() 方法跑一下

    public static void main(String[] args) throws IllegalAccessException {
        int rs = Inte编程客栈ger.numberOfLeadingZeros(1024) | (1 << (16 - 1));
        int sizeCtl = (rs << 16) + 2;
        System.out.println(sizeCtl);
        System.out.println((sizeCtl<<16)>>16);
    }

会得到下面的结果

-2146107390

2

有意思了,(sizeCtl<<16)>>16,这个操作是先左移 16 位,再右移 16 位,相当于把 sizeCtl 的高 16 位都置为 0 了。得到了一个 2,其实,这个 2 就是说当前有 (2 - 1) 个线程在进行扩容操作。(PS: sizeCtl 注释里有写~)。具体是为什么,我们继续往下看。

transfer() 执行完,就回到了 tryPresize()。然后继续返回就到了 putAll()。继续往下执行就是 putVal() 方法了。

10.putVal()

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
        //... ...

首先判断了下 null,然后计算了下 hash 哈希值。就进入 for 循环了。首先是第一个分支。其实就是 tab 还没有初始化的时候会进入这个分支。

11.initTable()

    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

好像也没有多少复杂的,因为我们之前已经对 sizeCtl 做了充分的解释,这里如果 sc < 0 的话,说明在扩容或者是初始化中,然后当前线程直接执行了 Thread.yield();,就是放弃 CPU 执行权等待下次分配 CPU 时间片,如果不小于 0 ,并且 tab = null ,那说明现在还没有线程执行扩容,那当前线程就会更新 sizeCtl,然后自己开始执行初始化动作,初始化好后直接返回 tab

继续回到 putVal() 方法,如果执行第二个分支,说明 tab[i] == null,这个位置还没有元素,直接通过 casTabAt() 方法进行赋值。如果这个位置有值,并且 (fh = f.hash) == MOVED 说明在扩容或者是在初始化,这个时候当前线程会进行 辅助扩容

12.helpTransfer()

    final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;
        if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode<K,V>)f).nextTable)开发者_JAVA != null) {
            int rs = resizeStamp(tab.length);
            while (nextTab == nextTable && table == tab &&
                   (sc = sizeCtl) < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }

首先在 if 条件里获取到 nextTab 如果不为 null,就会进入 while 循环,首先 sc < 0 说还在扩容或者是初始化中,while 循环里的第一个分支是不需要辅助扩容或者是已经达到最大的辅助线程数量,或者是已经剩最后一个线程在扩容了,其他的线程都结束了。所以直接 break; 就可以了。

第二个分支,首先会执行 sizeCtl + 1 的操作,执行成功就会执行 transfer() 方法,这个方法我们之前已经看过了,就不看了,需要注意的是,我们之前说过,sizeCtl 的低 16 位代表目前正在扩容的线程数减一。因为这里新加入一个线程参与扩容,所以对 sizeCtl 进行了加一的操作。如果还有线程进来,那么 sizeCtl 还会加一。这里就解释清楚 sizeCtl 的另外的一个用法了。扩容结束直接 break;。继续回到 putVal()

继续下一次 for,如果 tab[i] 还不等于 null,那就说明发生哈希冲突了,并且当前已经不在扩容了。就走到了最后一个分支,jBBTfU使用 synchronized 加锁的这一段代码里,这段代码其实并不复杂,发生冲突之后无非就两种情况,链表或者是红黑树。你可以看下 TreeBin 的构造方法。它的哈希值是 -2。

    static final int TREEBIN   = -2; // hash for roots of trees
    //... ...
    TreeBin(TreeNode<K,V> b) {
        super(TREEBIN, null, null, null);
        //... ... 
    }

所以才有了 if (fh >= 0) 的判断,如果首节点的哈希值大于 0 ,那一定是链表。

最后还有一段进行树化的判断操作。

    static final int TREEIFY_THRESHOLD = 8;
    //... ...
    if (binCount != 0) {
        if (binCount >= TREEIFY_THRESHOLD)
            treeifyBin(tab, i);
        if (oldVal != null)
            return oldVal;
        break;
    }

链表的节点数超过 8 就会进行树化操作。到这里其实 putVal() 的相关操作基本上已经结束了,就剩最后一个 addCount() 方法了,看名称也知道这个是更新计数器的,盲猜也能猜到应该跟元素数量有关系。不过,好像有点问题啊,你有没有发现,在整个 putVal() 方法里面,好像没有触发扩容的逻辑!!

13.addCount()

其实这个方法除了操作我们之前见到的 counterCells 属性外,还会判断是否需要进行扩容。因为只有在知道具体的元素数量后,才能判断出是否需要扩容。我们先看这个方法的第一段代码。

    private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }
        //... ...

首先,我们假设目前是第一次执行这个方法,那么 counterCells == null,然后就会使用 CAS 执行 baseCount = b + x失败之后,就开始执行 fullAddCount() 方法了,因为现在 as == null 是成立的。

fullAddCount() 方法与 Striped64.longAccumulate() 方法基本上是一模一样的,因为之前已经跳过了 Striped64,所以这里也不打算去细看。直接总结下 counterCells 的作用。其实 counterCells 就是在多个线程同时更新 baseCount 失败时记录下新增的元素数量。

举个例子就明白了,假设 ConcurrentHashMap 初始化完成之后,有 2 个线程,同时执行了 addCount(),那么 baseCount 会更新成 1,counterCells 会初始化为一个大小为 2 的数组,且一个元素是 null,另外一个元素的 counterCells[i].value 值是 1。

如果这个时候又来了一个线程,会有 3 种情况,

  • CAS 更新 baseCount 成功,baseCount = 2。第三个线程结束
  • CAS 失败且 counterCells[ThreadLocalRandom.getProbe() & m] == null。继续初始化 counterCells 的另一个为 null 的元素,值为 1。
  • CAS 失败且 counterCells[ThreadLocalRandom.getProbe() & m] != null,那么 counterCells[i].value 值更新成 2。

ThreadLocalRandom.getProbe() 方法其实就是取了个随机值。就是说,如果有多个线程同时更新的话,失败的线程会随机的从 counterCells 取一个元素,将新增的数量保存进去。

其实很简单,能进入到 fullAddCount() 方法的条件只有一种,counterCells == null 并且 CAS 更新 baseCount 失败,这种情况就是有多个线程同时执行了 addCount() 方法,比如,有两线程同时执行 putVal(),那么必然有一个线程在 CAS 更新 baseCount 时会失败,这个时候就进入到 fullAddCount() 方法。这个方法内部就是在操作 counterCells 数组。操作的行为基本上就是下面这几种

要么是初始化 counterCells 数组

if (counterCells == as) {
    CounterCell[] rs = new CounterCell[2];
    rs[h & 1] = new CounterCell(x);
    counterCells = rs;
    init = true;
}

要么就是初始化 counterCells 数组元素

    CounterCell r = new CounterCell(x); // Optimistic create
    if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
        boolean created = false;
        try {               // Recheck under lock
            CounterCell[] rs; int m, j;
            if ((rs = counterCells) != null &&
                    (m = rs.length) > 0 &&
                    rs[j = (m - 1) & h] == null) {
                rs[j] = r;
                created = true;
            }
        } finally {
            cellsBusy = 0;
        }
        if (created)
            break;
        continue;           // Slot is now non-empty
    }

要么就是更新 counterCells 数组元素的值

    else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
        break;

还有一种操作就是 扩容 ,对 counterCells 进行扩容。

    if (cellsBusy == 0 &&
            U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
        try {
            if (counterCells == as) {// Expand table unless stale
                CounterCell[] rs = new CounterCell[n << 1];
                for (int i = 0; i < n; ++i)
                    rs[i] = as[i];
          python      counterCells = rs;
            }
        } finally {
            cellsBusy = 0;
        }
        collide = false;
        continue;                   // Retry with expanded table
    }

fullAddCount() 方法里,每次循环都会重新随机取元素 h = ThreadLocalRandom.advanceProbe(h);。如果执行循环了多次,都没有保存成功,说明 counterCells 的容量不够用了,就会触发扩容。从上面的代码里也能看到,counterCells 的扩容非常简单,数组直接翻倍,元素直接赋值到新数组里,位置都没有变。

继续回到 addCount() 方法,之后的逻辑就是判断了下 check 参数。其实这里的逻辑是,如果有多个线程同时操作,只要没有发生哈希冲突,就不进行扩容检查。你往回翻一下就可以看到 check 参数其实就是 tab[i] 位置的元素数量。

如果发生了哈希冲突,或者说没有多个线程同时操作(这个时候就进入不了当前的分支,更新 baseCount 不会失败),就会执行 s = sumCount(); 这个方法非常简单,就是对 baseCountcounterCells 里的数值进行了一下求和,然后就开始执行下面的代码。

    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            if (sc < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                    (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }

首先 while 循环判断了下当前的元素数量是否超过了 siwww.devze.comzeCtl,即便是在扩容期间,sizeCtl 小于 0 的时候,也算成立。然后判断了下 tab ,基本上也会成立。直接进入循环内部

第一个分支 if (sc < 0) 说明已经有线程开始执行扩容动作了,这个时候更新 sizeCtl 的值加一,当前线程参与 辅助扩容

第二个分支是目前还没有线程进行扩容操作,那么当前线程开始执行扩容,(rs << RESIZE_STAMP_SHIFT) + 2) 这个数值我们之前已经看到过了,就不再赘述了。

循环最后重新计算 s ,扩容结束后 s 就不会小于 sizeCtl,方法结束。

好了,到这里我们基本上就把 ConcurrentHashMap 的 put 操作的逻辑看完了。其实整体上跟 HashMap还是比较类似的,基本上就是把所有对 tab 的操作都使用 Unsafe 包装了一下,解决多线程操作的问题,而发生哈希冲突时也是使用了 synchronized 进行了加锁,解决了多线程操作链表的覆盖问题。比较难的反而是元素数量的问题。因为 ConcurrentHashMap 一定要保证元素保存到 tab 成功后,元素数量一定也要加成功!不能因为元素数量的值更新失败了,再把保存到 tab 里的元素删除掉吧。所以呢 ConcurrentHashMap 就使用 counterCells 数组来保存那些更新 baseCount 失败的数量。

14.get()

下面我们看下 get() 方法

    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length)jBBTfU > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

好像也不复杂,第一个分支是一次就从 tab[i] 位置找到了元素,直接返回。最后一个 while 循环是 tab[i] 位置发生了哈希冲突,且当前位置是链表,通过 while 循环遍历寻找。

重点说一下第二个分支吧 else if (eh < 0) 有两种情况,一种是 tab[i] 位置发生了哈希冲突,且当前位置是红黑树,这时 eTreeBin 类型的,因为涉及到红黑树,我们直接跳过,有兴趣的可以自己研究下。另外一种情况是在扩容期间,当前元素已经转移到新的 nextTable 上了,这时 e 的类型是 ForwardingNode 类型,我们直接看下 ForwardingNode 类的 find() 代码

    Node<K,V> find(int h, Object k) {
        // loop to avoid arbitrarily deep recursion on forwarding nodes
        outer: for (Node<K,V>[] tab = nextTable;;) {
            Node<K,V> e; int n;
            if (k == null || tab == null || (n = tab.length) == 0 ||
                    (e = tabAt(tab, (n - 1) & h)) == null)
                return null;
            for (;;) {
                int eh; K ek;
                if ((eh = e.hash) == h &&
                        ((ek = e.key) == k || (ek != null && k.equals(ek))))
                    return e;
                if (eh < 0) {
                    if (e instanceof ForwardingNode) {
                        tab = ((ForwardingNode<K,V>)e).nextTable;
                        continue outer;
                    }
                    else
                        return e.find(h, k);
                }
                if ((e = e.next) == null)
                    return null;
            }
        }
    }

也不复杂,就是直接在 nextTable 找元素,如果 nextTable[i] 位置为 null 直接返回,否则就进入了 for (;;) 循环里,跟之前类似,第一个分支里是直接找到了元素,而 if (eh < 0) 也有两种情况,一个是扩容时转移到新的 nextTable,一个就是红黑树。最后就是链表了。

好了,到这里基本上所有的内容都结束了,最后还剩一点有意思的东西,就是 Traverser 类,这个类其实实现了在扩容期间,也能使 ConcurrentHashMap 可以高效的(不使用锁)遍历。代码不多,有兴趣的话可以读一下~

以下是遗留的一些内容,有机会再补吧

  • sun.misc.Unsafe 类。
  • LongAdder and Striped64
  • sun.misc.Contended 注解
  • TreeBin

以上就是Java源码重读之ConcurrentHashMap详解的详细内容,更多关于Java ConcurrentHashMap的资料请关注我们其它相关文章!

0

精彩评论

暂无评论...
验证码 换一张
取 消