AbstractQueuedSynchronizer(二)
AQS,AbstractQueuedSynchronizer,即队列同步器。它是构建锁或者其他同步组件的基础框架,如ReentrantLock、ReentrantReadWriteLock、Semaphore等,它是JUC并发包中的核心基础组件。
AQS的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态。AQS使用一个int类型的成员变量state来代表共享资源和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。当state>0时表示已经获取了锁,当state = 0时表示释放了锁。它提供了三个方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))来对state进行操作,当然AQS可以确保对state的操作是安全的。
AQS通过内置的FIFO同步队列来完成资源获取线程的排队工作,如果当前线程获取同步状态失败(锁)时,AQS则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。AbstractQueuedSynchronizer类的数据结构如下:

AbstractQueuedSynchronizer类底层的数据结构是使用双向链表,是队列的一种实现,故也可看成是队列,其中Sync queue,即同步队列,是双向链表,包括head结点和tail结点,head结点主要用作后续的调度。而Condition queue不是必须的,其是一个单向链表,只有当使用Condition时,才会存在此单向链表。并且可能会有多个Condition queue。
AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:
- isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
- tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
- tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
- tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
- tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。
再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。
类定义
1 | public abstract class AbstractQueuedSynchronizer |
内部类Node
1 | static final class Node { |
每个被阻塞的线程都会被封装成一个Node结点,放入队列。每个节点包含了一个Thread类型的引用,并且每个节点都存在一个状态,具体状态如下。CANCELLED表示当前的线程被取消;SIGNAL表示当前节点的后继节点包含的线程需要运行,需要进行unpark操作;CONDITION表示当前节点在等待condition,也就是在condition queue中;PROPAGATE表示当前场景下后续的acquireShared能够得以执行;值0表示当前节点在sync queue中,等待着获取锁。
acquire函数
此方法是独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。这也正是lock()的语义,当然不仅仅只限于lock()。获取到资源后,线程就可以去执行其临界区代码了。下面是acquire()的源码:
1 | public final void acquire(int arg) { |
首先调用tryAcquire函数,调用此方法的线程会试图在独占模式下获取对象状态。此方法应该查询是否允许它在独占模式下获取对象状态,如果允许,则获取它并则直接返回。在AbstractQueuedSynchronizer源码中默认会抛出一个异常,即需要子类去重写此函数完成自己的逻辑。
若tryAcquire失败,则调用addWaiter函数,addWaiter函数完成的功能是将调用此方法的线程封装成为一个结点并放入Sync queue。
调用acquireQueued函数,此函数完成的功能是Sync queue中的结点不断尝试获取资源,若成功,则返回true,否则,返回false。
首先分析addWaiter(Node.EXCLUSIVE):
1 | // 添加等待者 |
通过tryAcquire()和addWaiter(),该线程获取资源失败,已经被放入等待队列尾部了。那么该线程下一步即进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己再拿到资源。即acquireQueued 函数的作用:
1 | // sync队列中的结点在独占且忽略中断的模式下获取(资源) |
首先获取当前节点的前驱节点,如果前驱节点是头结点并且能够获取资源,代表该当前节点能够占有锁,设置头结点为当前节点,返回。否则,调用shouldParkAfterFailedAcquire和parkAndCheckInterrupt函数,shouldParkAfterFailedAcquire函数如下:
1 | // 当获取(资源)失败后,检查并且更新结点状态 |
只有当该节点的前驱结点的状态为SIGNAL时,才可以对该结点所封装的线程进行park操作。否则,将不能进行park操作。再看parkAndCheckInterrupt函数,源码如下
1 | // 进行park操作并且返回该线程是否被中断 |
parkAndCheckInterrupt函数里的逻辑是首先执行park操作,然后返回该线程是否已经被中断。park()会让当前线程进入waiting状态。在此状态下,有两种途径可以唤醒该线程:被unpark()或被interrupt()。
再看final块中的cancelAcquire函数,其源码如下:
1 | // 取消继续获取(资源) |
该函数完成的功能就是取消当前线程对资源的获取,即设置该结点的状态为CANCELLED,接着再看unparkSuccessor函数,此方法用于唤醒等待队列中下一个线程,源码如下:
1 | // 释放后继结点 |
acquire()的过程即先调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回;没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
release函数
1 | public final boolean release(int arg) { |
tryRelease的默认实现是抛出异常,需要具体的子类实现,如果tryRelease成功,那么如果头结点不为空并且头结点的状态不为0,则释放头结点的后继结点,unparkSuccessor函数已经分析过,不再累赘。
对于其他函数我们也可以分析,与前面分析的函数大同小异,所以,不再累赘。
acquireShared函数
此方法是共享模式下线程获取共享资源的顶层入口。它会获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止,整个过程忽略中断。下面是acquireShared()的源码:
1 | public final void acquireShared(int arg) { |
这里tryAcquireShared()依然需要自定义同步器去实现。但是AQS已经把其返回值的语义定义好了:负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取。
所以这里acquireShared()的流程就是:tryAcquireShared()尝试获取资源,成功则直接返回;失败则通过doAcquireShared()进入等待队列,直到获取到资源为止才返回。
releaseShared函数
此方法是共享模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源。下面是releaseShared()的源码:
1 | public final boolean releaseShared(int arg) { |
此方法的流程也比较简单,一句话:释放掉资源后,唤醒后继。跟独占模式下的release()相似,但有一点稍微需要注意:独占模式下的tryRelease()在完全释放掉资源(state=0)后,才会返回true去唤醒其他线程,这主要是基于独占下可重入的考量;而共享模式下的releaseShared()则没有这种要求,共享模式实质就是控制一定量的线程并发执行,那么拥有资源的线程在释放掉部分资源时就可以唤醒后继等待结点。
AbstractQueuedSynchronizer中还有内部类ConditionObject,表示Condition Queue的数据结构,由于篇幅原因暂不展开了。
参考阅读:
http://blog.csdn.net/zero__007/article/details/51570276
http://www.cnblogs.com/leesf456/p/5350186.html
http://www.cnblogs.com/waterystone/p/4920797.html