LongAdder源码分析_longadder重置-程序员宅基地

技术标签: java  collection集合  多线程  开发语言  

1、概述

老铁们,发现一篇文章写的贼好,我把链接放在这里了。
一:http://www.wazhi.com.cn/SchoolManage/NewsDispatcher?NewsId=942ee429-0c82-4e3b-8df3-4910795d7cfc&SchoolId=1166&action=singlenews
二:https://blog.csdn.net/JMU_software19/article/details/122020917
AtomicLong 通过 CAS 提供了非阻塞的原子性操作,相比使用阻塞算法的同步器来说它的性能已经很好了,但是 JDK 开发组并不满足于此。使用 AtomicLong 时,在高并发下大量线程会同时去竞争更新同一个原子变量,但是由于同时只有一个线程的CAS 操作会成功,这就造成了大量线程竞争失败后,会通过无限循环不断进行自旋尝试CAS 的操作,而这会白白浪费 CPU 资源,所以引入了LongAdder类。
LongAdder内部维护了多个Cell变量,每一个cell变量有一个初始值为0的long类型变量。这样争夺一个原子变量就改成了争夺多个原子变量提高了性能,最后的结果累加base和每一个cell得到。
在这里插入图片描述

public class TestLongAdder {
    
    private static Integer integer = 0;

    private static final Integer THREAD_NUM = 50;//线程数

    private static final Integer COUNT = 100 * 10000;//每个线程计数次数

    private static LongAdder longAdder = new LongAdder();

    private static AtomicLong atomicLong = new AtomicLong();

    public static void main(String[] args) throws InterruptedException{
    
        testAtomicLong();
    }

    private static void testAtomicLong() throws InterruptedException {
    
        CountDownLatch countDownLatch = new CountDownLatch(THREAD_NUM);
        long start = System.currentTimeMillis();
        long a = 1L;
        for (int i = 0; i < THREAD_NUM; i++) {
    
            Thread thread = new Thread(new Runnable() {
    
                @Override
                public void run() {
    
                    try {
    
                        for (int j = 0; j < COUNT; j++) {
    
                            atomicLong.incrementAndGet();
                        }
                    } finally {
    
                        countDownLatch.countDown();
                    }
                }
            });
            thread.start();
        }
        countDownLatch.await();
        long end = System.currentTimeMillis();
        System.out.println("testAtomicLong结果" + atomicLong + "耗时:" + (end - start) + "毫秒");
    }

    private static void testLongAdder() throws InterruptedException {
    
        CountDownLatch countDownLatch = new CountDownLatch(THREAD_NUM);
        long start = System.currentTimeMillis();
        for (int i = 0; i < THREAD_NUM; i++) {
    
            Thread thread = new Thread(new Runnable() {
    
                @Override
                public void run() {
    
                    try {
    
                        for (int j = 0; j < COUNT; j++) {
    
                            longAdder.add(1L);
                        }
                    } finally {
    
                        countDownLatch.countDown();
                    }
                }
            });
            thread.start();
        }
        countDownLatch.await();
        long end = System.currentTimeMillis();
        System.out.println("testLongAdder结果" + longAdder + "耗时:" + (end - start) + "毫秒");
    }

}

从下面的结果可以看出,线程数比较少时使用AtomicLong有优势但是差别不大,线程数越多LongAdder的优势越明显。
在这里插入图片描述

2、源码分析

下面的源码分析大家hold住
请添加图片描述

2.1 add

    public void add(long x) {
    
    	//as表示cell[]的引用,b表示获取的base的值,v表示期望值,m表示cells数组的长度,a表示当前线程命中的单元格
        Cell[] as; long b, v; int m; Cell a;
        //条件一表示cells已经初始化过了,当前线程应该将数据写入对应的cell中,条件一失败进入条件二,条件二表示cells没有初始化,进入casBase(b = base, b + x),即当前线程应该将数据写入到base变量中
        //casBase如果为true表示当前线程自增操作成功,如果为false表示发生竞争可能需要重试或者扩容
        if ((as = cells) != null || !casBase(b = base, b + x)) {
    
        //什么时候会进来?
        //1.cells已经初始化过了,当前线程应该将数据写入到对应的cell中2.表示当前线程写入base时cas失败需要重试或者扩容

			//true表示没有发生竞争,false表示发生竞争
            boolean uncontended = true;
            //条件一为true说明cells没有初始化,false说明cells已经初始化了当前线程应该找自己的cell写值
            //条件二为true说明当前线程对应下标的cell为空需要创建。false说明当前线程对应的cell不为空需要将x值添加到cell中。m表示cells数组的长度-1,cells数组的长度一定是2^n,这样可以将乘法运算转为与运算
            //条件三为true说明cas将x加到cell中失败,false说明将x加到cell成功有竞争
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                //都有哪些情况会进来?
               	//1.true->说明cells未初始化,也就是多线程写base发生竞争【重试|初始化cells】
               	//2.true->说明当前线程对应下标的cell为空【需要创建】
               	//3.true->表示cas失败,意味着当前线程对应的cell有竞争【重试|扩容】
                longAccumulate(x, null, uncontended);
        }
    }

2.2 casebase

    final boolean casBase(long cmp, long val) {
    
        return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
    }

2.3 getProbe

获取当前线程的hash值

    static final int getProbe() {
    
        return UNSAFE.getInt(Thread.currentThread(), PROBE);
    }

2.4 Striped64

//计算机cpu数量,控制cells数组的关键条件
static final int NCPU = Runtime.getRuntime().availableProcessors();
//没有发生竞争时数据会累加到base上,或者cells数组初始化时有线程已经先一步初始化
transient volatile long base;
//cellsBusy用来实现自旋锁,状态值只有0和1,当创建Cell元素,扩容Cell数组或者初始化Cell数组时,使用CAS操作该变量来保证同时只有一个线程可以进行其中之一的操作。0表示无锁状态
transient volatile int cellsBusy;
//通过cas方式获取锁
    final boolean casCellsBusy() {
    
        return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
    }
//重置当前线程的hash值
    static final int advanceProbe(int probe) {
    
        probe ^= probe << 13;   // xorshift
        probe ^= probe >>> 17;
        probe ^= probe << 5;
        UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
        return probe;
    }

2.3 longAccumulate

//1.true->说明cells未初始化,也就是多线程写base发生竞争【重试|初始化cells】
//2.true->说明当前线程对应下标的cell为空【需要创建】
//3.true->表示cas失败,意味着当前线程对应的cell有竞争【重试|扩容】
//x表示要加的值,fn表示计算的接口,waUncontended表示是否发生竞争,一般情况为true表示没有发生过竞争只有cells初始化之后,并且当前线程竞争修改失败才是false
    final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
    
        //h表示当前线程的hash值                      
        int h;
        //条件成立说明当前线程还没有分配hash值
        if ((h = getProbe()) == 0) {
    
        	//给当前线程分配hash值
            ThreadLocalRandom.current(); // force initialization
            //取出当前线程的hash值赋值给h
            h = getProbe();
            //为什么将wasUncontended 置为true?默认情况下线程第一次进来都会在cells[0]累加数据,此时cas失败并不意味着发生竞争所以将wasUncontended改为true
            wasUncontended = true;
        }
        //false表示扩容意向,false一定不会扩容,true可能会扩容
        boolean collide = false;                // True if last slot nonempty
        //自旋
        for (;;) {
    
        	//as表示cells引用,a表示当前线程命中的cell,n表示cells数组长度,v表示期望值
            Cell[] as; Cell a; int n; long v;
            //case1:表示当前线程已经被初始化了,当前线程应该将数据写入到对应的cell中
            //2.true->说明当前线程对应下标的cell为空【需要创建】
			//3.true->表示cas失败,意味着当前线程对应的cell有竞争【重试|扩容】
            if ((as = cells) != null && (n = as.length) > 0) {
    
            	//case1.1:true表示当前线程对应的下标位置的cell为null,需要创建新的cell
                if ((a = as[(n - 1) & h]) == null) {
    
                	//true 表示锁没被占用,false表示锁被占用
                    if (cellsBusy == 0) {
           // Try to attach new Cell
                    	//用当前的x创建cell
                        Cell r = new Cell(x);   // Optimistically create
                        //条件一:true表示当前锁没有被占用,false表示当前锁被占用
                        //条件二:true表示当前线程获取锁成功,false表示当前线程获取锁失败
                        if (cellsBusy == 0 && casCellsBusy()) {
    //(a)
                        //是否创建成功 标记
                            boolean created = false;
                            try {
                   // Recheck under lock
                            //rs表示当前cells引用,m表示当前cells长度,j表示当前线程命中的下标
                                Cell[] rs; int m, j;
                                //条件一条件二总是成立,(a = as[(n - 1) & h]) == null和rs[j = (m - 1) & h] == null为什么需要判断两次?假设线程a和线程b(a)时,线程a执行完cellsBusy之后让出cpu,线程b继续执行,线程b创建了cell并且放到了cells数组中,此时线程a重新获得cpu再次进入会覆盖掉线程b的值所以需要再次判断
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
    
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
    
                                cellsBusy = 0;
                            }
                            //如果created为false那么会再次rehash重试
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    //如果锁被占用将扩容意向改为false
                    collide = false;
                }
                //case1.2:只有cells已经被初始化之后,并且当前线程竞争修改失败才会是false
                else if (!wasUncontended)       // CAS already known to fail
                //将wasUncontended置为true之后走到h = advanceProbe(h);重置当前线程hash值
                    wasUncontended = true;      // Continue after rehash
                //case1.3:当前线程rehash过hash值,新命中的cell不为空,如果成功直接退出循环;如果失败表示rehash之后命中的新cell也有竞争重试一次 再重试一次
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                  //case1.4
                 //条件一:如果数组长度大于cpu的数量那么扩容意向改成false表示不扩容false说明cells数组还可以扩容
                 //条件二为true表示其他线程已经扩容过了,当前线程Rehash重试
                else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale
                //case1.5
                //collide默认为false第一次进来并不会真正扩容,而是扩容意向为true,而是rehash只有再次rehash失败又到这里才会真正扩容
                else if (!collide)
                    collide = true;
                 //case1.6:扩容。条件一:cellBusy==0 true表示当前无锁状态,当前线程可以竞争这把锁;条件二:ture表示当前线程获取锁成功可以执行扩容逻辑,false当前时刻有其他线程执行扩容相关操作,当前线程rehash之后重试
                else if (cellsBusy == 0 && casCellsBusy()) {
    //(b)
                    try {
    
                       //cells==as?为什么已经加锁了还需要再次判断cells=as?假设线程1和线程2同时执行到到(b),线程1执行到cellsBusy之后线程2得到cpu的执行权,那么线程2执行完扩容逻辑,线程1继续执行如果没有(cells == as)则会扩容两次
                        if (cells == as) {
          // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
    
                    	//释放锁
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = advanceProbe(h);
            }
            //case2:cells还没初始化,as为null,初始化cells数组
            //条件一:表示当前未加锁
            //条件二:其他线程没有修改cells数组
            //条件三:true表示获取锁成功,会将cellsBusy改为1;失败表示其他线程正在持有这把锁
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
    //(1)
                boolean init = false;
                try {
                               // Initialize table
                	//cells==as?防止其他线程已经初始化了当前线程再次初始化丢失数据
                	//如果两个线程同时执行(1)第一个线程执行完cellsBusy == 0 && cells == as之后阻塞第二个线程执行并且初始化了数组并且释放锁,然后第一个线程继续执行casCellsBusy又会重新初始化数组会导致丢失数据
                    if (cells == as) {
    
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
    
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            //case3:初始化Cell数组的时候,多个线程尝试CAS修改cellsBusy加锁的时候,失败的线程会走到这个分支
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }

2.4 sum

需要强调的是sum返回的值不是非常精确的,因为在累加过程中可能有其他线程对Cell中的值进行了修改,也有可能对数组进行了扩容。

    public long sum() {
    
        Cell[] cs = cells;
        long sum = base;
        if (cs != null) {
    
            for (Cell c : cs)
                if (c != null)
                    sum += c.value;
        }
        return sum;
    }

3、一些细节问题

1、 cell数组的初始值为多少?

回答:是2
追问、为什么cell数组为2^n?
回答:位运算代替取余运算,因为计算线程在哪个cell时使用(n - 1) & h,实际上应该是h%(n-1)

2、Cell类为什么使用@sun.misc.Contended static final class Cell修饰?

回答:原子性数组元素的内存地址是连续的,所以数组内的多个元素能经常共享缓存行,因此这里使用 @sun.misc.Contended 注解对 Cell 类进行字节填充,这防止了数组中多个元素共享一个缓存行,发生伪共享问题。伪共享请参考链接https://blog.csdn.net/qq_41714995/article/details/130139318

3、LongAdder的结构是怎样的?

回答: base 变量和cell数组

4、当前线程应该访问Cell数组里面的哪一个Cell元素?

(n - 1) & h,其中的h是Thread里面的threadLocalRandomProbe

5、Cell数组如何扩容?

如果线程多次rehash之后还是cas失败,那么cell数组就会扩容,每次扩容之后的长度是上一次的2倍,并且cells数组长度小于等于cpu的个数。

6、线程访问分配的Cell元素有冲突后如何处理?

会rehash之后重新cas,多次尝试失败之后会扩容

7、如何保证线程操作被分配的Cell元素的原子性?

内部的value使用volatile进行修饰

8、为什么cells数组的长度不超过cpu的数量?

因为一个CPU同一时间只能执行一个线程。假设有16个CPU,有一个32长度的cells数组,那么同一时刻这台电脑最多占用16个cell,并发到16。剩下的16个cell就浪费空间了

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/qq_41714995/article/details/130036144

智能推荐

python编码问题之encode、decode、codecs模块_python中encode在什么模块-程序员宅基地

文章浏览阅读2.1k次。原文链接先说说编解码问题编码转换时,通常需要以unicode作为中间编码,即先将其他编码的字符串解码(decode)成unicode,再从unicode编码(encode)成另一种编码。 Eg:str1.decode('gb2312') #将gb2312编码的字符串转换成unicode编码str2.encode('gb2312') #将unicode编码..._python中encode在什么模块

Java数据流-程序员宅基地

文章浏览阅读949次,点赞21次,收藏15次。本文介绍了Java中的数据输入流(DataInputStream)和数据输出流(DataOutputStream)的使用方法。

ie浏览器无法兼容的问题汇总_ie 浏览器 newdate-程序员宅基地

文章浏览阅读111次。ie无法兼容_ie 浏览器 newdate

想用K8s,还得先会Docker吗?其实完全没必要-程序员宅基地

文章浏览阅读239次。这篇文章把 Docker 和 K8s 的关系给大家做了一个解答,希望还在迟疑自己现有的知识储备能不能直接学 K8s 的,赶紧行动起来,K8s 是典型的入门有点难,后面越用越香。

ADI中文手册获取方法_adi 如何查看数据手册-程序员宅基地

文章浏览阅读561次。ADI中文手册获取方法_adi 如何查看数据手册

React 分页-程序员宅基地

文章浏览阅读1k次,点赞4次,收藏3次。React 获取接口数据实现分页效果以拼多多接口为例实现思路加载前 加载动画加载后 判断有内容的时候 无内容的时候用到的知识点1、动画效果(用在加载前,加载之后就隐藏或关闭,用开关效果即可)2、axios请求3、map渲染页面4、分页插件(antd)代码实现import React, { Component } from 'react';//引入axiosimport axios from 'axios';//引入antd插件import { Pagination }_react 分页

随便推点

关于使用CryPtopp库进行RSA签名与验签的一些说明_cryptopp 签名-程序员宅基地

文章浏览阅读449次,点赞9次,收藏7次。这个变量与验签过程中的SignatureVerificationFilter::PUT_MESSAGE这个宏是对应的,SignatureVerificationFilter::PUT_MESSAGE,如果在签名过程中putMessage设置为true,则在验签过程中需要添加SignatureVerificationFilter::PUT_MESSAGE。项目中使用到了CryPtopp库进行RSA签名与验签,但是在使用过程中反复提示无效的数字签名。否则就会出现文章开头出现的数字签名无效。_cryptopp 签名

新闻稿的写作格式_新闻稿时间应该放在什么位置-程序员宅基地

文章浏览阅读848次。新闻稿是新闻从业者经常使用的一种文体,它的格式与内容都有着一定的规范。本文将从新闻稿的格式和范文两个方面进行介绍,以帮助读者更好地了解新闻稿的写作_新闻稿时间应该放在什么位置

Java中的转换器设计模式_java转换器模式-程序员宅基地

文章浏览阅读1.7k次。Java中的转换器设计模式 在这篇文章中,我们将讨论 Java / J2EE项目中最常用的 Converter Design Pattern。由于Java8 功能不仅提供了相应类型之间的通用双向转换方式,而且还提供了转换相同类型对象集合的常用方法,从而将样板代码减少到绝对最小值。我们使用Java8 功能编写了..._java转换器模式

应用k8s入门-程序员宅基地

文章浏览阅读150次。1,kubectl run创建pods[root@master ~]# kubectl run nginx-deploy --image=nginx:1.14-alpine --port=80 --replicas=1[root@master ~]# kubectl get podsNAME READY STATUS REST...

PAT菜鸡进化史_乙级_1003_1003 pat乙级 最优-程序员宅基地

文章浏览阅读128次。PAT菜鸡进化史_乙级_1003“答案正确”是自动判题系统给出的最令人欢喜的回复。本题属于 PAT 的“答案正确”大派送 —— 只要读入的字符串满足下列条件,系统就输出“答案正确”,否则输出“答案错误”。得到“答案正确”的条件是: 1. 字符串中必须仅有 P、 A、 T这三种字符,不可以包含其它字符; 2. 任意形如 xPATx 的字符串都可以获得“答案正确”,其中 x 或者是空字符串,或..._1003 pat乙级 最优

CH340与Android串口通信_340串口小板 安卓给安卓发指令-程序员宅基地

文章浏览阅读5.6k次。CH340与Android串口通信为何要将CH340的ATD+Eclipse上的安卓工程移植到AndroidStudio移植的具体步骤CH340串口通信驱动函数通信过程中重难点还存在的问题为何要将CH340的ATD+Eclipse上的安卓工程移植到AndroidStudio为了在这个工程基础上进行改动,验证串口的数据和配置串口的参数,我首先在Eclipse上配置了安卓开发环境,注意在配置环境是..._340串口小板 安卓给安卓发指令