开发者

LongAdder原理及创建使用示例详解

开发者 https://www.devze.com 2023-01-14 10:30 出处:网络 作者: Afu
目录LongAdder介绍1.Atomic原子类2.LongAdder原理2.查看LongAdder的add方法3.longAccumulate方法3.1.创建Cell数组3.2.创建桶位Cell对象3.3.扩容Cell数组4.总结5.附上源码解析5.1.add方法5.2.longAccumulate方法LongA
目录
  • LongAdder介绍
    • 1.Atomic原子类
    • 2.LongAdder原理
    • 2.查看LongAdder的add方法
    • 3.longAccumulate方法
      • 3.1.创建Cell数组
      • 3.2.创建桶位Cell对象
      • 3.3.扩容Cell数组
    • 4.总结
      • 5.附上源码解析
        • 5.1.add方法
        • 5.2.longAccumulate方法

    LongAdder介绍

    1.Atomic原子类

    Atomic的原子类内部使用的是CAS原则,CAS是一个乐观锁,但是如果是在高并发的情况下的话,多个线程不断地竞争

    CAS的不断的自旋,非常耗CPU.

    高并发环境下,value变量其实是一个热点,也就是多个线程竞争一个热点。

    这时候就需要使用的 LongAdder 来替代 Atomic类

    2.LongAdder原理

    LongAdder的原理就是分散热点,将value分散到一个数组中,不同的线程去找自己的对应的Cell进行修改值,

    各个线程对Cell进行CAS操作,这样热点就被分散了,冲突的概率小了,性能就提高了.

    如果要返回实际的值,返回所有的数组中的值和base值就行.

    LongAdder原理及创建使用示例详解

    2.查看LongAdder的add方法

    public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            if (as == null || (m = as.lengthttp://www.devze.comh - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                longAccumulate(x, null, uncontended);
        }
    }
    

    一看到这种代码就头皮发麻

    简单说明一http://www.devze.com下这几行代码作用,说多了,你也懒得看

    第一个if作用:

    如果还没有初始化cells数组,就去修改base值

    如果修改Base值失败,说明多个线程对base值修改发生了竞争

    第二个if作用:

    判断有没有cells有没有值,有的话说明已经初始化过cells数组了

    知道对应的桶位添加值或者修改值

    我感觉你已经不知道我在说什么了!

    反正你就知道,如果有多个线程发生了竞争,就去cells数组中找对应的桶位的cell添加或者修改值即可

    3.longAccumulate方法

    这里的代码特别的恶心,是能助眠的好代码,建议收藏!

    简单说明一下几个核心的代码

    3.1.创建Cell数组

      else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                    boolean init = false;
                    try {
                        if (cells == as) {
                            Cell[] rs = new Cell[2];
                            rs[h & 1] = new Cell(x);
                            cells = rs;
                            init = true;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    if (init)
                        break;
                }
    

    Cell数组还没有进行初始化,

    创建长度为2的Cell数组

    在索引1的位置存储第一个Cell对象

    3.2.创建桶位Cell对象

                    if ((as = cells) != null && (n = as.length) > 0) {    
                  if ((a = as[(n - 1) & h]) == null) {
                        if (cellsBusy == 0) {     
                            Cell r = new Cell(x);  
                            if (cellsBusy == 0 && casCellsBusy()) {
                                boolean created = false;
                                try {             
                                    Cell[] rs; int m, j;
                                    if ((rs = cells) != null &&
                                        (m = rs.length) > 0 &&
                                        rs[j = (m - 1) & h] == null) {
                                        rs[j] = r;
                                        created = true;
                                    }
                                } finally {
                                    cellsBusy = 0;
                                }
                                if (created)
                                    break;
                                continue;          
                            }
                        }
    

    m - 1) & h 通过路由寻址公式找到对应的索引位置的桶位为空,然后把创建的Cell对象存储进去

    更直白就是说,你要蹲坑了,你要去找坑,如果坑位没有人,你就进去

    3.3.扩容Cell数组

     else if (cellsBusy == 0 && casCellsBusy()) {
                        try {
                            if (cells == as) {     
                                Cell[] rs = new Cell[n << 1];
                                for (int i = 0; i < n; ++i)
                                    rs[i] = as[i];
                                cells = rs;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        collide = false;
                        continue;                 
                    }
    

    数组长度不够时,2倍扩容

    4.总结

    你现在肯定是云里雾里,讲的都是什么东西.

    你现在就只需要知道 LongAdder

    通过 base 和 Cell数组 换取更高的性能

    5.附上源码解析

    当然这里你可以跳过了,不用看了

    5.1.add方法

    public void add(long x) {
            // as 表示cells引用
            // b 表示获取的base值
            // v 表示期望值
            // m 表示cells数组的长度
            // a 表示当前线程命中cell单元格
            Cell[] as; long b, v; int m; Cell a;
            //条件一: --> true 表示cells已经初始化过了,当前线程应该把数据写到对应的cell中
            //        --> false 表示cells还没有初始化过,当前线程所有的数据都被写到base中
            //条件二: --> false 表示cas替换值成功
            //        --> true 表示发生竞争了,可能需要重试 或者 扩容
            if ((as = cells) != null || !casBase(b = base, b + x)) {
                //什么时候会进来?
                //1.true 表示cells已经初始化过了,当前线程应该把数据写到对应的cell中
                //2.true cells还没有初始化,但是发生竞争了
                // true -> 未竞争  false ->发生竞争
                boolean uncontended = true;
                //as == null || (m = as.length - 1) < 0 看做是一个条件
                //条件一:true --> 说明cells还没有被初始化,也就是多线程写base发生竞争了进来的,也就是通过第二个条件进来的
                //      false --> 说明cells已经初始化了,可以去寻找自己的cell进行赋值
                //条件二: getProbe() 可以理解为获取当前线程的hash值, m表示cells长度-1  注意:cells的次方数一定是2的次方
                // true --> 说明当前线程对应的下标的cell为空,需要创建 longAccumulate 支持
                // false --> 说明当前线程对应的下标cell不为空,下一步需要将 x,添加到cell中
                //条件三: true --> 代表cas失败,代表当前线程对应的cell,有竞争
                //        false --> 表示cas成功
                if (as == null || (m = as.length - 1) < 0 ||
                    (a = as[getProbe() & m]) == null ||
                    !(uncontended = a.cas(v = a.value, v + x)))
                    //都哪些情况会调用这里的方法?
                    //1.true --> 说明cells还没有被初始化,也就是多线程写base发生竞争了进来的
                    //2.true --> 说明当前线程对应的下标的cell为空,需要创建 longAccumulate 支持
                    //3.true --> 代表cas失败,代表当前线程对应的cell,有竞争
                    longAccumulate(x, null, uncontended);
            }
        }
    

    5.2.longAccumulate方法

     /**
         * //1.true --> 说明cells还没有被初始化,也就是多线程写base发生竞争了进来的,到时候会进行初始化cell
         * //2.true --> 说明当前线程对应的下标的cell为空,需要创建 longAccumulate 支持
         * //3.true --> 对应的下标也有cell,但是cas失败,代表当前线程对应的cell,有竞争
         */
        //wasUncontended,只有cells初始化之后,并且当前线程 竞争修改失败,才会是false,其他情况下都是true
        final void longAccumulate(long x, LongBinaryOperator fn,
                                  boolean wasUncontended) {
            //表示线程的hash值
            int h;
            //条件成立,说明当前线程还未分配hash值
            if ((h = getProbe()) == 0) {
         开发者_C学习       //给当前线程分配hash值
                ThreadLocalRandom.current(); // force initialization
                //取出当前线程的hash值赋值给h
                h = getProbe();
                //为什么?默认情况下,可定写入到了cell[0]的位置,不把他当做一次真正的竞争??
                wasUncontended = true;
            }
            //表示扩容意向, false表示不会扩容 , true有可能会扩容
            boolean collide = false;                // True if last slot nonempty
            //自旋
            for (;;) {
                //as 表示cells引用
                //a 表示当前线程命中的cell
                //n 表示cell的数组长度
                //v 表示期望值
                Cell[] as; Cell a; int n; long v;
                //情况1:as不为空表示cells已经初始化了,当前线程应该将数据写入到对应的cell中
                if ((as = cells) != null && (n = as.length) > 0) {
                    //2.true --> 说明当前线程对应的下标的cell为空,需要创建 longAccumulate 支持
                    //3.true --> 代表cas失败,代表当前线程对应的cell,有竞争[重试或者扩容]
                    //情况1.1: true 表示当前线程对应的下标位置的cell为null,需要创建cell
                    if ((a = as[(n - 1) & h]) == null) {
                        //true 表示当前锁未被占用,false-->表示锁被占用
                        if (cellsBusy == 0) {       // Try to attach new Cell
                            //拿x创建cell
                            Cell r = n编程ew Cell(x);   // Optimistically create
                            //条件一:true 表示当前锁未被占用,false-->表示锁被占用
                            //条件二:true表示当前线程获取锁成功,false表示当前线程获取锁失败,
                            if (cellsBusy == 0 && casCellsBusy()) {
                                //是否创建成功的标记
                                boolean created = false;
                                try {               // Recheck under lock
                                    //rs表示cells引用
                                    //m 表示cells长度
                                    //j 表示当前线程命中的下标
                                    Cell[] rs; int m, j;
                                    //条件一,条件二,恒成立的
                                    //rs[j = (m - 1) & h] == null 为了防止其他线程已经初始化话过了,防止当前线程再次修改,覆盖掉原来的cell
                                    if ((rs = cells) != null &&
                                        (m = rs.length) > 0 &&
                                        rs[j = (m - 1) & h] == null) {
                                        rs[j] = r;
                                        created = true;
                                    }
                                } finally {
                                    cellsBusy = 0;
                                }
                                if (created)
                                    break;
                                continue;           // Slot is now non-empty
                            }
                        }
                        //扩容意向改为false
                        collide = false;
                    }
                    //情况1.2
                    //wasUncontended,只有cells初始化之后,并且当前线程 竞争修改失败,才会是false,其他情况下都是true
                    else if (!wasUncontended)       // CAS already known to fail
                        wasUncontended = true;      // Continue after rehash
                    //情况1.3 :当前线程rehash过后,然后新命中的cell不为空
                    //true --> 写成功 退出
                    //false -- > rehash过后命中的cell,也有竞争  这里为重试一次,再重试一次
                    else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                                 fn.applyAsLong(v, x))))
                        break;
                        //情况1.4.
                        // 条件一: n >= NCPU  true --> 扩容意向改为False ,表示不扩容了   false --> 表示cells还可以扩容
                       //条件二:  true --> 表示其他线程已经扩容过了,当前线程rehash之后重试即可
                    else if (n >= NCPU || cells != as)
                        collide = false;            // At max size or stale
                    //默认开始为false 的, 设置为true,需要扩容,但是不一定真的发生扩容
                    else if (!collide)
                        collide = true;
                    //情况6.真正的扩容逻辑
                    //条件一: cellsBusy == 0 true  --> 表示当前无锁状态,当前线程可以去竞争这把锁
                    //条件二:casCellsBusy() true --> 表示当前线程获取锁成功,可以执行扩容逻辑 ,false 表示有其他线程在做相关的操作
                    // 尝试两次之后
                    else if (cellsBusy == 0 && casCellsBusy()) {
                        try {
                            if (cells == as) {      // Expand table unless stale
                     javascript           //n<<1 翻倍
                                Cell[] rs = new Cell[n << 1];
                                for (int i = 0; i < n; ++i)
                                    rs[i] = as[i];
                                cells = rs;
                            }
                        } finally {
                            //释放锁
                            cellsBusy = 0;
                        }
                        collide = false;
                        continue;                   // Retry with expanded table
                    }
                    //重置当前线程hash值
                    h = advanceProbe(h);
                }
                //情况2:前置条件,cells为进行初始化,as为null
                //条件一: cellsBusy为true当前未加锁
                //条件二: cells == as 为什么? 因为其他线程在你给as赋值之后修改了cells
                //条件三: true 表示获取锁成功 ,会把cellBusy 设置为 1 , false 表示其他线程在持有这个锁
                else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                    boolean init = false;
                    try {
                        // 为什么还有一次判断呢?
                        //防止其他线程已经初始化了,当前线程再次初始化,导致丢失数据
                        if (cells == as) {
                            Cell[] rs = new Cell[2];
                            rs[h & 1] = new Cell(x);
                            cells = rs;
                            init = true;
                        }
                    } finally {
                        //释放锁
                        cellsBusy = 0;
                    }
                    if (init)
                        break;
                }
                //情况三:
                //1.当前cellBusy加锁状态,表示其他线程正在初始化cells,所以当前线程累加到base
                //2.cells被其他线程初始化之后,当前线程需要将数据累加到base
                else if (casBase(v = base, ((fn == null) ? v + x :
                                            fn.applyAsLong(v, x))))
                编程客栈    break;                          // Fall back on using base
            }
        }

    以上就是LongAdder原理及创建使用示例详解的详细内容,更多关于LongAdder创建使用的资料请关注我们其它相关文章!

    0

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    关注公众号