Java并发编程系列:深入分析AQS原理_aqs 队列获得锁之后为什么执行thread.currentthread().interrupt()-程序员宅基地

技术标签: Java杂货铺  


AQS又称为队列同步器,它是用来构建锁或其他同步组件的基础框架,它是实现ReentrangLock、Semaphore等同步工具的基础。本文将会详细的阐述AQS实现的细节问题。

数据结构定义

AQS内部通过int类型的state控制锁的状态,当state=0时,则说明没有任何线程占有共享资源的锁,当state>0时,则说明有线程目前正在使用共享变量,其他线程必须加入同步队列进行等待。同步队列为FIFO的双向队列,竞争失败的线程会被添加至队尾。

// 同步队列的头部
private transient volatile Node head;
// 同步队列的尾部
private transient volatile Node tail;
// 同步状态
private volatile int state;

Node节点的定义:

//标识线程的状态
volatile int waitStatus;
//等待队列的前驱节点
volatile Node prev;
//等待队列的后继节点
volatile Node next;
//当前节点的线程
volatile Thread thread;
//条件队列的等待节点
Node nextWaiter;
//判断当前节点是否是共享节点
final boolean isShared() {
    
 return nextWaiter == SHARED;
}
  • 1 CANCELLED:该节点的线程可能由于超时或被中断而处于被取消(作废)状态,一旦处于这个状态,节点状态将一直 处于CANCELLED,因此应该从队列中移除
  • -1 SIGNAL:表示该节点处于等待唤醒状态,后继节点会被挂起,因此在当前节点释放锁或被取消之后必须唤醒其后继结点
  • -2 CONDITION:该节点的线程处于等待条件状态,不会被当作是同步队列上的节点,直到被唤醒(signal),设置其值为0,重新进入阻塞状态
  • 0:新加入的节点

在锁的获取时,并不一定只有一个线程才能持有这个锁,所以此时有了独占模式和共享模式的区别,通过nextWaiter来区分。
还有一个点是公平锁和非公平锁,它是由子类来实现的。在ReentrantLock中有FairSync和NonFairSync来实现。

下面以ReentrantLock为例,解释锁的获取和释放流程。

获取锁

获取锁的流程为Lock.lock -> Sync.lock -> AQS.acquire -> Sync.tryAcquire -> AQS.addWaiter ->AQS.acquireQueued,我们按照这个流程逐步分析。TODO 流程图

# Lock.lock -> Sync.lock

获取锁可以通过ReentrantLock中的lock、lockInterruptibly、tryLock,此三个方法的意义在ReentrantLock的文章中已经详细阐述。

public void lock() {
    
    sync.lock();
}
public void lockInterruptibly() throws InterruptedException {
    
    sync.acquireInterruptibly(1);
}
public boolean tryLock() {
    
    return sync.nonfairTryAcquire(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

可以看出内部都是通过sync来实现,抽象类Sync继承了AQS,并且Sync的实现类为FairSync和NonFairSync。因此调用根据构造方法实例化出的FairSync或着NonFairSync的lock方法:

// Fair
final void lock() {
    
    acquire(1);
}
// NonFair
final void lock() {
    
    // 以cas方式尝试将AQS中的state从0更新为1
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

公平锁和非公平锁的lock方法这里就有区别,非公共锁先通过CAS操作去竞争锁,然后再去执行AQS实现的acquire方法。
exclusiveOwnerThread属性是AQS从父类AbstractOwnableSynchronizer中继承的属性,用来保存当前占用锁的线程。

# AQS.acquire -> Sync.tryAcquire

继续跟进AQS

public final void acquire(int arg) {
    
    if (tryAcquire(arg){
    
        return;
    }
    if(acquireQueued(addWaiter(Node.EXCLUSIVE), arg))) {
    
        selfInterrupt();
    }
}

首先执行tryAcquire方法,由具体的子类实现,不同的子类有不同的实现方式,如果失败,表示该线程获取锁失败,就调用addWaiter方法,将当前线程加入到等待队列中,然后返回当前线程的node节点。将node节点传递给acquireQueued方法,如果node节点的前驱节点是头结点,就再次尝试获取到锁,如果获取锁成功(成功返回的是false不会执行selfInterrupt方法),就将该节点设置为头结点,如果获取失败,就将当前节点的线程挂起。

下面看非公平锁的tryAcquire实现:

// NonFair
protected final boolean tryAcquire(int acquires) {
    
    return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
    
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
    
        // state为0,说明当前锁未被任何线程持有
        if (compareAndSetState(0, acquires)) {
    // CAS设置state,如果成功,则设置锁的拥有者为当前线程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 如果是重入得情况
    else if (current == getExclusiveOwnerThread()) {
    
        int nextc = c + acquires;
        setState(nextc);// 更改进入的次数
        return true;
    }
    return false;
}

这里公平锁和非公平锁的实现几乎相同,只是多了一个!hasQueuedPredecessors()判断条件,意思是当前同步队列中如果没有正在排队的线程,才会进行后续的步骤。

# addWaiter

如果执行到addWaiter,则说明前面的tryAcquire没有抢到锁,那么会将将节点加入到等待队列。这里需要注意前面提到独享锁和共享锁,
ReentrantLock属于独享锁,并且AQS通过Node节点也就是线程的封装来表示独享/共享,因此这里传入的Mode的参数为Node.EXCLUSIVE。

private Node addWaiter(Node mode) {
    
    // 将当前线程构造为等待节点
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
    // 如果尾节点部为空
        node.prev = pred;// 将当前节点添加至队列尾部
        if (compareAndSetTail(pred, node)) {
    
            pred.next = node;
            return node;
        }
    }
    // 如果尾节点为空,或者CAS操作失败,则通过死循环更新尾节点
    enq(node);
    return node;
}

enq方法没什么好说的,死循环+CAS,返回node的前驱节点

private Node enq(final Node node) {
    
    for (;;) {
    
        Node t = tail;
        if (t == null) {
     // 如果尾节点为空,那么初始化尾和头,头节点是一个空节点
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
     // 
            node.prev = t;
            if (compareAndSetTail(t, node)) {
    
                t.next = node;
                return t;
            }
        }
    }
}
# acquireQueued

在把node插入队列末尾后,它并不立即挂起该节点中线程,因为在插入它的过程中,前面的线程可能已经执行完成,
所以它会先进行自旋操作,尝试让该线程重新获取锁。代码如下:

final boolean acquireQueued(final Node node, int arg) {
    
    boolean failed = true;
    try {
    
        boolean interrupted = false;
        for (;;) {
    
            // 得到前驱节点
            final Node p = node.predecessor();
            // 如果前驱节点是head节点并且tryAcquire获取到锁
            if (p == head && tryAcquire(arg)) {
    
                setHead(node); // 设置Head为节点当前节点
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 如果当前节点前驱节点不是head或者CAS设置失败,去挂起线程
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt() )
                interrupted = true;
        }
    } finally {
    
    	// 正常情况下failed = false,cancelAcquire的作用是删除节点,
        if (failed)
            cancelAcquire(node);
    }
}

shouldParkAfterFailedAcquire()方法的作用是判断当前结点的前驱结点是否为SIGNAL状态,如果是则返回true。
如果为CANCELLED状态(值为1>0),即结束状态,则说明该前驱结点已没有用应该从同步队列移除,直到寻找到非CANCELLED状态的结点。倘若前驱结点的ws值不为CANCELLED,也不为SIGNAL(当从Condition的条件等待队列转移到同步队列时,结点状态为CONDITION因此需要转换为SIGNAL),那么将其转换为SIGNAL状态,以便在下轮循环中将其挂起。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    
    int ws = pred.waitStatus;// 前驱节点的状态
    if (ws == Node.SIGNAL)// 如果是等待唤醒状态,返回true,
        return true;
    if (ws > 0) {
     // >0 则为CNACLE状态,被取消了,需要将前驱节点移除
        do {
    
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
    // 
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

private final boolean parkAndCheckInterrupt() {
    
        //将当前线程挂起
        LockSupport.park(this);
        //获取线程中断状态,interrupted()是判断当前中断状态,
        return Thread.interrupted();
}

parkAndCheckInterrupt()方法挂起当前线程,需要等待一个unpark()操作来唤醒它,调用interrupte方法可以中断,稳定后列表的状态为

  • 除了头节点,剩余节点都被阻塞,线程处于WAITING状态。
  • 除了尾节点,剩余节点都满足waitStatus==SIGNAL,表示释放后需要唤醒后继节点。

到此ReetrantLock内部间接通过AQS的FIFO的同步队列就完成了lock()操作。一张图总结lock的流程:
lock

释放锁

下面继续看unLock的流程:

public final boolean release(int arg) {
    
   if (tryRelease(arg)) {
    
       Node h = head;
       if (h != null && h.waitStatus != 0)
           unparkSuccessor(h);
       return true;
   }
   return false;
}

释放锁其实就两个步骤,1.释放锁,2.如果完全释放则唤醒等待的线程。先看释放锁:

protected final boolean tryRelease(int releases) {
    
    int c = getState() - releases;// 
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
    // 完全释放
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

完全释放表示ownerThread的所有重入操作均已结束,接着是唤醒后面的线程,注意这里并没有将head置为null,只是将ExclusiveOwnerThread和state初始化。

private void unparkSuccessor(Node node) {
           
    int ws = node.waitStatus;
    if (ws < 0) // 正常情况下为-1
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
    // 如果下一个节点为空或者被取消,继续从尾节点开始找离头结点最近的
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)// 状态<0 的节点
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);// 唤醒下一个节点
}

当最近可用的节点被唤醒后,会进入acquireQueued()函数的if (p == head && tryAcquire(arg))的判断,继续开始自旋。

Condition实现原理

在 https://blog.csdn.net/TheLudlows/article/details/76962006 中介绍了Condition的用法,类似于Object的wait和notify。

# await

Condition接口提供了await、signal方法,它的实现为AQS的内部类ConditionObject,在它的内部也有一个队列,称为等待队列,单向列表实现。AQS内部的队列叫做同步队列。同步队列主要用来保存阻塞的线程,而等待队列用来保存调用了await方法的线程。ConditionObject的成员变量如下:

 public class ConditionObject implements Condition, java.io.Serializable {
    
    //等待队列第一个等待结点
    private transient Node firstWaiter;
    //等待队列最后一个等待结点
    private transient Node lastWaiter;
}

等待队列的元素和同步队列中的元素都是Node类型。当一个线程调用了await()相关的方法,那么该线程将会释放锁,并构建一个Node节点封装当前线程的相关信息加入到等待队列中进行等待,直到被唤醒、中断、超时才从队列中移出。

等待队列中结点的状态只有两种即CANCELLED和CONDITION,前者表示线程已结束需要从等待队列中移除,后者表示条件结点等待被唤醒。每个Codition对象对于一个等待队列,也就是说AQS中只能存在一个同步队列,但可拥有多个等待队列。

下面分析await方法的逻辑:

public final void await() throws InterruptedException {
    
    if (Thread.interrupted())
        throw new InterruptedException();
    // 构建为Node节点,并加入队尾
    Node node = addConditionWaiter();
    // 释放当前线程锁即释放同步状态
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
    // 阻塞,直到收到信号或被中断
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 唤醒之后执行,同时判断线程是否被中断
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

添加到等待队列:

private Node addConditionWaiter() {
    
    Node t = lastWaiter;
    // lastWaiter初始化为null
    // 清除被唤醒的node
    if (t != null && t.waitStatus != Node.CONDITION) {
    
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 新建node,状态为 Node.CONDITION
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)// 初始化队列
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

尽管此处没有任何线程安全的保护,但实际使用时不会出现任何线程安全问题——因为条件队列的使用要求我们在调用await或signal时持有与该条件队列唯一相关的锁。共享锁中没有实现Lock接口,因此没有newCondition方法。

final int fullyRelease(Node node) {
    
    boolean failed = true;
    try {
    
        int savedState = getState();
        if (release(savedState)) {
    // 上节讲过的release,注意参数
            failed = false;
            return savedState;// 返回状态
        } else {
    
            throw new IllegalMonitorStateException();
        }
    } finally {
    
        if (failed)// 正常情况不会进入此分支
            node.waitStatus = Node.CANCELLED;
    }
}
# signal
public final void signal() {
    
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

signal()方法做了两件事,一是判断当前线程是否持有独占锁,没有就抛出异常,从这点也可以看出只有独占模式先采用等待队列,而共享模式下是没有等待队列的,也就没法使用Condition。二是将等待队列的头节点从等待队列中删除,同时将它加入到同步队列,意思是次线程可以去竞争锁了。

private void doSignal(Node first) {
    
   do {
    
   	   firstWaiter = first.nextWaiter;// 移除首节点
       if (firstWaiter == null)
           lastWaiter = null;
       first.nextWaiter = null; // 将旧的首节点next属性置为null
       
   } while (!transferForSignal(first) && (first = firstWaiter) != null);
}     

transferForSignal 将 first节点移出等待队列,通过时修改状态,加入同步队列,根据在同步队列中的前驱节点的状态和来决定是否唤醒等待阻塞的线程。

final boolean transferForSignal(Node node) {
    
	// 设置节点状态为初始状态
   if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    Node p = enq(node);// 加入同步队列,得到前驱节点
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

其实这里不唤醒阻塞的线程也是可以的,因为此线程已经加入到同步队列中,同步队列中等待的线程是通过前驱节点的来唤醒的。但是这里为什么要多次一举?能够进入此分支说明前驱节点是CANCELLED状态,那么说明当前节点距离Head又进了一步,早些将此CANCELLED节点清除,因此将次线程唤醒,去竞争锁,同时删除无效的节点。

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/TheLudlows/article/details/88696874

智能推荐

vue引入原生高德地图_前端引入原生地图-程序员宅基地

文章浏览阅读556次,点赞2次,收藏3次。由于工作上的需要,今天捣鼓了半天高德地图。如果定制化开发需求不太高的话,可以用vue-amap,这个我就不多说了,详细就看官网 https://elemefe.github.io/vue-amap/#/zh-cn/introduction/install然而我们公司需要英文版的高德,我看vue-amap中好像没有这方面的配置,而且还有一些其他的定制化开发需求,然后就只用原生的高德。其实原生的引入也不复杂,但是有几个坑要填一下。1. index.html注意,引入的高德js一定要放在头部而_前端引入原生地图

ViewGroup重写大法 (一)-程序员宅基地

文章浏览阅读104次。本文介绍ViewGroup重写,我们所熟知的LinearLayout,RelativeLayout,FrameLayout等等,所有的容器类都是ViewGroup的子类,ViewGroup又继承View。我们在熟练应用这些现成的系统布局的时候可能有时候就不能满足我们自己的需求了,这是我们就要自己重写一个容器来实现效果。ViewGroup重写可以达到各种效果,下面写一个简单的重写一个Vi..._viewgroup 重写

Stm32学习笔记,3万字超详细_stm32笔记-程序员宅基地

文章浏览阅读1.8w次,点赞279次,收藏1.5k次。本文章主要记录本人在学习stm32过程中的笔记,也插入了不少的例程代码,方便到时候CV。绝大多数内容为本人手写,小部分来自stm32官方的中文参考手册以及网上其他文章;代码部分大多来自江科大和正点原子的例程,注释是我自己添加;配图来自江科大/正点原子/中文参考手册。笔记内容都是平时自己一点点添加,不知不觉都已经这么长了。其实每一个标题其实都可以发一篇,但是这样搞太琐碎了,所以还是就这样吧。_stm32笔记

CTS(13)---CTS 测试之Media相关测试failed 小结(一)_mediacodec框架 cts-程序员宅基地

文章浏览阅读1.8k次。Android o CTS 测试之Media相关测试failed 小结(一)CTSCTS 即兼容性测试套件,CTS 在桌面设备上运行,并直接在连接的设备或模拟器上执行测试用例。CTS 是一套单元测试,旨在集成到工程师构建设备的日常工作流程(例如通过连续构建系统)中。其目的是尽早发现不兼容性,并确保软件在整个开发过程中保持兼容性。CTS 是一个自动化测试工具,其中包括两个主要软件组件:CTS tra..._mediacodec框架 cts

chosen.js插件使用,回显,动态添加选项-程序员宅基地

文章浏览阅读4.5k次。官网:https://harvesthq.github.io/chosen/实例化$(".chosen-select").chosen({disable_search_threshold: 10});赋值var optValue = $(".chosen-select").val();回显1.设置回显的值$(".chosen-select").val(“opt1”);2.触发cho..._chosen.js

C++ uint8_t数据串如何按位写入_unit8_t 集合 赋值 c++-程序员宅基地

文章浏览阅读1.9k次。撸码不易,网上找不到,索性自己写,且撸且珍惜!void bitsWrite(uint8_t* buff, int pos, int size, uint32_t value){ uint32_t index[] = { 0x80000000, 0x40000000, 0x20000000, 0x10000000, 0x8000000, 0x4000000, 0x2000000, 0x1000000, 0x800000, 0x400000, 0_unit8_t 集合 赋值 c++

随便推点

Javaweb框架 思维导图_javaweb框架图-程序员宅基地

文章浏览阅读748次。javaweb知识点_javaweb框架图

adb的升级与版本更新_adb iptabls怎么升级-程序员宅基地

文章浏览阅读1.1w次,点赞3次,收藏16次。adb是没有自动升级的命令的,如果想要更新adb的版本,我们可以在网上找到自己想要的版本进行更新给大家提供几个版本https://pan.baidu.com/s/1yd0dsmWn5CK08MlyuubR7g&shfl=shareset 提取码: 94z81、下载解压后我们可以找到下面几个文件,并复制2、找到adb安装的文件夹下的platform-tools文件夹,我这里是..._adb iptabls怎么升级

微信苹果版删除所有的聊天记录的图文教程_mac微信怎么删除聊天列表-程序员宅基地

文章浏览阅读3.8k次。很多用户可能都知道怎么在Windows系统上删除微信的聊天记录,那么苹果电脑上的微信软件怎么删除所有的聊天记录呢?下面小编就专门来给大家讲下微信mac版删除所有的聊天记录的图文教程。点击后会弹出提示窗口,点击这里的确认按钮就可以将其清理掉了。在这里选择要清理的数据,然后点击下方右边的清理按钮就行了。在mac上打开微信后,点击左下角的横线图标。然后再点击这里的管理微信聊天数据按钮。打开了设置窗口,点击上方的“通用”。在这里点击下方的前往清理按钮。点击弹出菜单里的“设置”。_mac微信怎么删除聊天列表

【报错笔记】数据类型转换时报错:Request processing failed;nested exception is java.lang.NumberFormatException:..._request processing failed; nested exception is jav-程序员宅基地

文章浏览阅读7.7k次。数据类型转换时报错:Request processing failed;nested exception is java.lang.NumberFormatException:For input String “20151512345”报错原因:数字格式异常,接着后面有 For input string: “201515612343” 提示,这就告诉我们你当前想把 “201515612343” 转换成数字类型时出错了。解决方案:使用2015151612343这个数字太大了,所以直接使用string_request processing failed; nested exception is java.lang.numberformatexcepti

qml 自定义消息框_Qt qml 自定义消息提示框-程序员宅基地

文章浏览阅读387次。版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。本文链接:https://blog.csdn.net/a844651990/article/details/78376767Qt qml 自定义消息提示框QtQuick有提供比较传统的信息提示框MessageDialog,但是实际开发过程并不太能满足我们的需求。下面是根据controls2模块中..._qml 自定义 messagedialog

Redis.conf 默认出厂内容_默认出厂的原始redis.conf文件全部内容-程序员宅基地

文章浏览阅读599次。# Redis configuration file example.## Note that in order to read the configuration file, Redis must be# started with the file path as first argument:## ./redis-server /path/to/redis.conf # Note on units: when memory size is needed, it is pos._默认出厂的原始redis.conf文件全部内容

推荐文章

热门文章

相关标签