AQS 源码分析

noah2021

并发编程|2021-8-17|最后更新: 2025-2-4|
type
status
date
slug
summary
tags
category
icon
password

概念

AQS(AbstractQueuedSynchronizer,即队列同步器),它是构建锁或者其他同步组件(线程协作类)的基础框架, AQS解决了类实现同步器时涉及的大量细节问题,例如获取同步状态 state、FIFO 同步队列。
ReentrantLock 和 Semaphore,共同点是都只允许一定数量的线程通过。ReentrantLock 是 lock 方法和 tryLock方法,而 Semaphore 是 acquire 方法和 tryAcquire方法事实上,不仅是 ReentrantLock 和 Semaphore,包括 CountDownLatch、ReentrantReadWriteLock 都有这样类似的同步功能,它们底层都用到了 AQS
AQS 的整体架构图如下:
notion image
图中两个点需要说明一下:
  1. AQS 中队列只有两个:同步队列 + 条件队列,底层数据结构两者都是链表;
  1. 图中有四种颜色的线代表四种不同的场景,1、2、3 序号代表看的顺序。
  1. 私以为上面图有点问题:被菱形包起来的 ( 2 队列空或满 ) 应该改成 ( 是否满足设置的额外条件 ),如果满足则指向左上,否则右上
为什么有了同步队列,还需要条件队列?
主要是因为并不是所有场景一个同步队列就可以搞定的,在遇到锁 + 条件结合的场景时,就需要 Lock + Condition 配合才行,先使用 Lock 来决定哪些线程可以获得锁,哪些线程需要到同步队列里面排队阻塞;获得锁的多个线程在不满足条件的情况下 ( 假如设置了条件的话 ),可以使用 Condition 来管理这些线程,让这些线程阻塞等待,然后在合适的时机后,被正常唤醒 ( 和管程十分类似 )。从一定程度上来看,条件队列是对同步队列的场景功能补充。同步队列 + 条件队列联手使用的场景,最多被使用到锁 + 条件的场景中。
管程
notion image
在管程模型里,共享变量和对共享变量的操作是被封装起来的,图中最外层的框就代表封装的意思。框的上面只有一个入口,并且在入口旁边还有一个入口等待队列。当多个线程同时试图进入管程内部时,只允许一个线程进入,其他线程则在入口等待队列中等待。这个过程类似就医流程的分诊,只允许一个患者就诊,其他患者都在门口等待。
假设有个线程 T1 执行出队操作,不过需要注意的是执行出队操作,有个前提条件,就是同步队列不能是空的,而同步队列不空这个前提条件就是管程里的条件变量。 如果线程 T1 进入管程后恰好发现队列是空的,此时线程 T1 就去 “队列不空” 这个条件变量的等待队列中等待。这个过程类似于大夫发现你要去验个血,于是给你开了个验血的单子,你就去验血的队伍里排队。线程 T1 进入条件变量的等待队列后,是允许其他线程进入管程的。这和你去验血的时候,医生可以给其他患者诊治,道理都是一样的。
再假设之后另外一个线程 T2 执行入队操作,入队操作执行成功之后,“队列不空” 这个条件对于线程 T1 来说已经满足了,此时线程 T2 要通知 T1,告诉它需要的条件已经满足了。当线程 T1得到通知后,会从等待队列里面出来,但是出来之后不是马上执行,而是重新进入到入口等待队列里面。这个过程类似你验血完,回来找大夫,需要重新分诊。

ReentrantLock和AQS的关系

ReentrantLock 内部有一个 Sync 类,Sync 类继承了 AQS,不仅如此,Semaphore、CountDownLatch 也是这样
继承 AQS 的类
notion image
CountDownLatch:维护了一个计数器 cnt,每次调用 countDown() 方法会让计数器的值减 1,减到 0 的时候,那些因为调用 await() 方法而在等待的线程就会被唤醒
CyclicBarrier:线程执行 await() 方法之后计数器会减 1,并进行等待,直到计数器为 0,所有调用 await() 方法而在等待的线程才能继续执行。CyclicBarrier 和 CountdownLatch 的一个区别是,CyclicBarrier 的计数器通过调用 reset() 方法可以循环使用,所以它才叫做循环屏障。CyclicBarrier 有两个构造函数,其中 parties 指示计数器的初始值,barrierAction 在所有线程都到达屏障的时候会执行一次。
Semaphore:类似于操作系统中的信号量,可以控制对互斥资源的访问线程数。比如对某个服务的并发请求,每次只能有 3 个客户端同时访问,请求总数为 10

原理解析

类注释

  1. 提供了一种框架,自定义了先进先出的同步队列,让获取不到锁的线程能进入同步队列中排队
  1. 同步器有个状态字段,我们可以通过状态字段来判断能否得到锁,设计的关键在于通过把状态声明为 volatile,在锁里面修改状态值来保证线程安全的
  1. 子类可以通过给状态 CAS 赋值来决定能否拿到锁,可以定义那些状态可以获得锁,哪些状态表示获取不到锁(比如定义状态值是 0 可以获得锁,状态值是 1 就获取不到锁)
  1. AQS 提供了排它模式和共享模式两种锁模式。排它模式下:同一时刻只有一个线程可以获得锁,共享锁可以允许多个线程获得同一个锁,并且可以设置获取锁的线程数量。子类 ReadWriteLock 实现了两种模式
  1. AQS 实现了锁、排队、锁队列等框架,至于如何获得锁、释放锁的代码并没有实现,比如 tryAcquire、tryRelease、tryAcquireShared、tryReleaseShared、isHeldExclusively 这些方法, AQS 中默认抛 UnsupportedOperationException 异常,都是需要子类去实现的
  1. 内部类 ConditionObject 可以被用作 Condition,Condition 提供了一种线程协作方式:一个线程被暂停执行,直到被其它线程唤醒
  1. AQS 继承 AbstractOwnableSynchronizer 是为了监控哪些线程持有了锁
  1. AQS 同步队列和条件队列,获取不到锁的节点在入队时是先进先出,但被唤醒时,可能并不会按照先进先出的顺序执行。

类属性

普通属性

state 的具体含义会根据具体实现类的不同而不同,比如在 Semaphore 里,它表示 ”剩余的许可证的数量”, 而在 CountDownLatch 里,它表示 “还需要倒数的数量”,在 ReentrantLock 中,state 用来表示锁的占有情况,包括可重入计数,当 state 的值为0的时候,标识该 Lock 不被任何线程所占有

同步队列属性

当多个线程都来请求锁时,某一时刻有且只有一个线程能够获得锁 ( 排它锁 ),那么剩余获取不到锁的线程,都会到同步队列中去排队并阻塞自己,当有线程主动释放锁时,就会从同步队列头开始释放一个排队的线程,让线程重新去竞争锁。 所以同步队列的主要作用阻塞获取不到锁的线程,并在适当时机释放这些线程。AQS 当中的同步等待队列也称 CLH 队列,是 FIFO 先入先出线程等待队列,Java 中的 CLH 队列是原 CLH 队列的一个变种,线程由原自旋机制改为阻塞机制
notion image

条件队列属性

条件队列不直接和锁打交道,但常常和锁配合使用,是一定的场景下,对锁功能的一种补充。Condition 是一个多线程间协调通信的工具类,使得某个或者某些线程一起等待某个条件 ( Condition ),只有当该条件具备时这些等待线程才会被唤醒从而重新进入同步队列排队。ConditionObject 是实现 Condition 接口的,Condition 接口相当于 Object 的各种监控方法,比如 Object#wait ()、Object#notify、Object#notifyAll 这些方法
notion image

Node

获取方法

acquire 方法 AQS 已经实现了,tryAcquire 方法是等待子类去实现,acquire 方法制定了获取锁的框架,先尝试使用 tryAcquire 方法获取锁,获取不到时,再入同步队列中等待锁。获取操作会依赖 state 变量,经常会阻塞 ( 比如获取不到锁的时候 ),在 Semaphore 中,获取就是 acquire 方法,作用是 “获取一个许可证” ;在 CountDownLatch 里面,获取就是 await 方法,作用是 “等待,直到倒数结束"
void acquire(int arg):独占式获取同步状态。如果当前线程获取同步状态成功,则由该方法返回;否则,将会进入同步队列等待。该方法将会调用可重写的 tryAcquire(int arg) 方法 void acquireShared(int arg):共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式的主要区别是在同一时刻可以有多个线程获取到同步状态

acquire

以上代码的主要步骤是(流程见整体架构图中红色场景):
  1. 尝试执行一次 tryAcquire,如果成功直接返回,失败走 2;
  1. 线程尝试进入同步队列,首先调用 addWaiter 方法,把当前线程放到同步队列的队尾;
  1. 接着调用 acquireQueued 方法,两个作用,1:阻塞当前节点,2:节点被唤醒时,使其能够获得锁;
  1. 如果 2、3 执行失败了 ( 返回 true ),打断线程。

addWaiter

acquireQueued

获取独占锁的方法是 acquireQueued,主要做两件事情:
  1. 通过不断的自旋尝试使自己前一个节点的状态变成 signal,然后阻塞自己
  1. 获得锁的线程执行完成之后,释放锁时会把阻塞的 node ( 插入同步队列队尾的线程 Node ) 唤醒,node 唤醒之后再次自旋,尝试获得锁

acquireShared

acquireShared 整体流程和 acquire 相同,代码也很相似,接下来进行比较:
  1. 尝试获得锁的地方,有所不同,排它锁使用的是 tryAcquire 方法,共享锁使用的是 tryAcquireShared 方法
  1. 节点获得排它锁时,仅仅把自己设置为同步队列的头节点即可(setHead 方法),但如果是共享锁的话,一个节点成功获得共享锁后还会去唤醒自己的后续节点,一起来获得该锁 ( setHeadAndPropagate --> doReleaseShared 方法 )

释放方法

释放操作不会阻塞,在Semaphore 中,释放就是 release 方法,作用是 "释放一个许可证" ;CountDownLatch 里面,获取就是 countDown 方法,作用是 "倒数1个数”
boolean release(int arg):独占式释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒 boolean releaseShared(int arg):共享式释放同步状态

release

释放独占锁的方法是 release,主要分成两步:
  1. tryRelease 尝试释放当前独占锁,失败返回 false,成功走 2;
  1. 从同步队列队头开始,找它的下一个节点,如果下一个节点是空的,就会从尾开始,一直找到状态不是被取消 (1) 的节点,然后释放该节点

releaseShared

释放共享锁的方法是 releaseShared,主要分成两步:
  1. tryReleaseShared 尝试释放当前共享锁,失败返回 false,成功走 2;
  1. 唤醒当前节点的后续阻塞节点

需要同步类自己去实现的方法

  • Semaphore 里面的静态内部类 Sync 中的 getPermits、nonfairTryAcquireShared、tryReleaseShared 等
  • CountDownLatch 里面的静态内部类 Sync 中的 getCount、tryAcquireShared、tryReleaseShared 等
boolean tryAcquire(int arg):独占式获取同步状态,获取同步状态成功后,其他线程需要等待该线程释放同步状态才能获取同步状态 boolean tryRelease(int arg):独占式释放同步状态。 int tryAcquireShared(int arg):共享式获取同步状态,返回值大于等于 0 ,则表示获取成功;否则,获取失败。 boolean tryReleaseShared(int arg):共享式释放同步状态。

AQS应用实例&源码解析

AQS用法

  1. 写一个类,想好同步的逻辑,实现获取/释放方法;
  1. 内部写一个 Sync 类继承 AQS;
  1. 根据是否独占来重写,如果是独占的就重写 tryAcquire/tryRelease ;如果是共享的就重写 tryAcquireShared(int acquires) 和 tryReleaseShared(int releases) 等方法,在步骤1写的获取/释放方法中调用 Sync ( AQS ) 的获取/释放方法;

简化版CountDownLatch

Loading...