Java并发:深入浅出AQS之独占锁模式源码分析

简介: 作者:凌风郎少原文链接:https://mp.weixin.qq.com/s/0WxKOqfvq1kVJDgk6NwWlgAbstractQueuedSynchronizer(以下简称AQS)作为java.util.concurrent包的基础,它提供了一套完整的同步编程框架,开发人员只需要实现其中几个简单的方法就能自由的使用诸如独占,共享,条件队列等多种同步模式。

作者:凌风郎少
原文链接:https://mp.weixin.qq.com/s/0WxKOqfvq1kVJDgk6NwWlg
AbstractQueuedSynchronizer(以下简称AQS)作为java.util.concurrent包的基础,它提供了一套完整的同步编程框架,开发人员只需要实现其中几个简单的方法就能自由的使用诸如独占,共享,条件队列等多种同步模式。我们常用的比如ReentrantLock,CountDownLatch等等基础类库都是基于AQS实现的,足以说明这套框架的强大之处。鉴于此,我们开发人员更应该了解它的实现原理,这样才能在使用过程中得心应手。

总体来说个人感觉AQS的代码非常难懂,本文就其中的独占锁实现原理进行分析。

一、执行过程概述
首先先从整体流程入手,了解下AQS独占锁的执行逻辑,然后再一步一步深入分析源码。

获取锁的过程:
1、当线程调用acquire()申请获取锁资源,如果成功,则进入临界区。
2、当获取锁失败时,则进入一个FIFO等待队列,然后被挂起等待唤醒。
3、当队列中的等待线程被唤醒以后就重新尝试获取锁资源,如果成功则进入临界区,否则继续挂起等待。

释放锁的过程:
1、当线程调用release()进行锁资源释放时,如果没有其他线程在等待锁资源,则释放完成。
2、如果队列中有其他等待锁资源的线程需要唤醒,则唤醒队列中的第一个等待节点(先进先出)。

二、源码深入分析
AQS核心实现
用一个双向链表来保存所有等待锁的Thread队列

链表中的每一个Node记录了一个线程以及其对应的等待锁的状态.

值得注意的是, 在AQS和Node的属性中各有一个state

AQS中的state

// 代表了当前锁的状态, 该锁即为队列中的所有Thread所抢占的锁,

// 注意, 这个state的取值是不受限制的, 不同于Node中的waitStatus, 这个state只有两种状态:

//0代表没有被占用,大于0代表有线程持有当前锁

private

volatile

int
state;

Node中的waitStatus

// 代表了当前Node所代表的线程的锁的等待状态

// 取值范围有限, 详见下文Node Field部分

volatile

int
waitStatus;

AQS Field
以下只列出几个重要的属性

// 头结点,大多数情况下就是当前持有锁的节点

private

transient

volatile

Node
head;

// 尾节点,每一个请求锁的线程会加到队尾

private

transient

volatile

Node
tail;

// 当前锁的状态,0代表没有被占用,大于0代表有线程持有当前锁

// 因为存在可重入锁的情况, 所以该值可能大于1

private

volatile

int
state;

// 代表当前持有独占锁的线程,在可重入锁中可以用这个来判断当前线程是否已经拥有了锁

// if (currentThread == getExclusiveOwnerThread()) {state++}

private

transient

Thread
exclusiveOwnerThread;
//继承自AbstractOwnableSynchronizer

Node Field
以下只列出几个重要的属性

static

final

class

Node
{

/* Marker to indicate a node is waiting in shared mode /

static

final

Node
SHARED =
new

Node
();

/* Marker to indicate a node is waiting in exclusive mode /

static

final

Node
EXCLUSIVE =
null
;

/* waitStatus value to indicate thread has cancelled /

static

final

int
CANCELLED =
1
;

/* waitStatus value to indicate successor's thread needs unparking /

static

final

int
SIGNAL = -
1
;

/* waitStatus value to indicate thread is waiting on condition /

static

final

int
CONDITION = -
2
;

/**

 * waitStatus value to indicate the next acquireShared should

 * unconditionally propagate

 */

static

final

int
PROPAGATE = -
3
;

volatile

int
waitStatus;

// 前置节点

volatile

Node
prev;

// 后置节点

volatile

Node
next;

// 节点所代表的线程

volatile

Thread
thread;

Node
nextWaiter;

}

acquire(int arg)
基于上面所讲的独占锁获取释放的大致过程,我们再来看下源码实现逻辑:
首先来看下获取锁的方法acquire()

public

final

void
acquire(
int
arg) {

if
(!tryAcquire(arg) &&

    acquireQueued(addWaiter(

Node
.EXCLUSIVE), arg))

    selfInterrupt();

}

代码虽然短,但包含的逻辑却很多,一步一步看下:

1、首先是调用开发人员自己实现的 tryAcquire() 方法尝试获取锁资源,如果成功则整个 acquire()方法执行完毕,即当前线程获得锁资源,可以进入临界区。

2、如果获取锁失败,则开始进入后面的逻辑,首先是 addWaiter(Node.EXCLUSIVE)方法。来看下这个方法的源码实现

addWaiter(Node)
此方法用于将当前线程加入到等待队列的队尾,并返回当前线程所在的节点。

//注意:该入队方法的返回值就是新创建的节点

private

Node
addWaiter(
Node
mode) {

//基于当前线程,节点类型(Node.EXCLUSIVE)创建新的节点

//由于这里是独占模式,因此节点类型就是Node.EXCLUSIVE

Node
node =
new

Node
(
Thread
.currentThread(), mode);

Node
pred = tail;

//这里为了提搞性能,首先执行一次快速入队操作,即直接尝试将新节点加入队尾

if
(pred !=
null
) {

        node.prev = pred;

//这里根据CAS的逻辑,即使并发操作也只能有一个线程成功并返回,其余的都要执行后面的入队操作。即enq()方法

if
(compareAndSetTail(pred, node)) {

            pred.next = node;

return
node;

        }

    }

//上一步失败则通过enq入队。

    enq(node);

return
node;

}

enq(final Node node)
此方法用于将node加入队尾

//完整的入队操作

private

Node
enq(
final

Node
node) {

// CAS 自旋 ,直到成功加入队尾

for
(;;) {

Node
t = tail;

//如果队列还没有初始化,则进行初始化,即创建一个空的头节点

if
(t ==
null
) {

//同样是CAS,只有一个线程可以初始化头结点成功,其余的都要重复执行循环体

if
(compareAndSetHead(
new

Node
()))

            tail = head;

    } 

else
{

//新创建的节点指向队列尾节点,毫无疑问并发情况下这里会有多个新创建的节点指向队列尾节点

        node.prev = t;

//基于这一步的CAS,不管前一步有多少新节点都指向了尾节点,这一步只有一个能真正入队成功,其他的都必须重新执行循环体

if
(compareAndSetTail(t, node)) {

            t.next = node;

//该循环体唯一退出的操作,就是入队成功(否则就要无限重试)

return
t;

        }

    }

}

}

上面的入队操作有两点需要说明:

1、初始化队列的触发条件就是当前已经有线程占有了锁资源,因此上面创建的空的头节点可以认为就是当前占有锁资源的节点(虽然它并没有设置任何属性)。

2、注意 enq(finalNodenode)代码是,是一个经典的CAS自旋操作,直到成功加入队尾,否则一直重试。

经过上面的操作,我们申请获取锁的线程已经成功加入了等待队列,通过文章最一开始说的独占锁获取流程,那么节点现在要做的就是挂起当前线程,等待被唤醒,这个逻辑是怎么实现的呢?来看下源码:

acquireQueued(final Node node, int arg)
通过 tryAcquire()和 addWaiter(),如果线程获取资源失败,已经被放入等待队列尾部了。

如果线程获取资源失败,下一步进入等待状态休息,直到其他线程彻底释放资源后,唤醒自己再拿到资源,在等待队列中排队拿号,直到拿到号后再返回。(队列先进先出)

通过上面的分析,该方法入参node就是刚入队的包含当前线程信息的节点

final

boolean
acquireQueued(
final

Node
node,
int
arg) {

//锁资源获取失败标记位

boolean
failed =
true
;

try
{

//等待线程被中断标记位

boolean
interrupted =
false
;

//这个循环体执行的时机包括新节点入队和队列中等待节点被唤醒两个地方

//又是一个“自旋”

for
(;;) {

//获取当前节点的前置节点

final

Node
p = node.predecessor();

//如果前置节点就是头结点,则尝试获取锁资源

if
(p == head && tryAcquire(arg)) {

//当前节点获得锁资源以后设置为头节点,这里继续理解我上面说的那句话

//头结点就表示当前正占有锁资源的节点

                setHead(node);

                p.next = 

null
;
//setHead中node.prev已置为null,此处再将head.next置为null,就是为了方便GC回收以前的head结点。也就意味着之前拿完资源的结点出队了!

//表示锁资源成功获取,因此把failed置为false

                failed = 

false
;

//返回中断标记,表示当前节点是被正常唤醒还是被中断唤醒

return
interrupted;

            }

//如果没有获取锁成功,则进入挂起逻辑

if
(shouldParkAfterFailedAcquire(p, node) &&

                parkAndCheckInterrupt())

//如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true

                interrupted = 

true
;

        }

    } 

finally
{

//最后会分析获取锁失败处理逻辑

if
(failed)

            cancelAcquire(node);

    }

}

挂起逻辑是很重要的逻辑,这里拿出来单独分析一下,首先要注意目前为止,我们只是根据当前线程,节点类型创建了一个节点并加入队列中,其他属性都是默认值。

shouldParkAfterFailedAcquire(Node pred, Node node)
此方法主要用于检查状态,看看自己是否真的可以去休息了,进入 waiting状态

//首先说明一下参数,node是当前线程的节点,pred是它的前置节点

private

static

boolean
shouldParkAfterFailedAcquire(
Node
pred,
Node
node) {

//获取前置节点的waitStatus

int
ws = pred.waitStatus;

if
(ws ==
Node
.SIGNAL)

//如果前置节点的waitStatus是Node.SIGNAL则返回true,然后会执行parkAndCheckInterrupt()方法进行挂起

return

true
;

if
(ws >
0
) {

//由waitStatus的几个取值可以判断这里表示前置节点被取消

do
{

            node.prev = pred = pred.prev;

        } 

while
(pred.waitStatus >
0
);

//这里我们由当前节点的前置节点开始,一直向前找最近的一个没有被取消的节点

//注,由于头结点head是通过new Node()创建,它的waitStatus为0,因此这里不会出现空指针问题,也就是说最多就是找到头节点上面的循环就退出了

        pred.next = node;

    } 

else
{

//根据waitStatus的取值限定,这里waitStatus的值只能是0或者PROPAGATE,那么我们把前置节点的waitStatus设为Node.SIGNAL然后重新进入该方法进行判断

        compareAndSetWaitStatus(pred, ws, 

Node
.SIGNAL);

    }

return

false
;

}

上面这个方法逻辑比较复杂,它是用来判断当前节点是否可以被挂起,也就是唤醒条件是否已经具备,即如果挂起了,那一定是可以由其他线程来唤醒的。该方法如果返回false,即挂起条件没有完备,那就会重新执行 acquireQueued()方法的循环体,进行重新判断,如果返回 true,那就表示万事俱备,可以挂起了,就会进入 parkAndCheckInterrupt()方法看下源码:

parkAndCheckInterrupt()
private

final

boolean
parkAndCheckInterrupt() {

LockSupport
.park(
this
);
//调用park()使线程进入waiting状态

//被唤醒之后,返回中断标记,即如果是正常唤醒则返回false,如果是由于中断醒来,就返回true

return

Thread
.interrupted();

}

注意: Thread.interrupted()会清除当前线程的中断标记位。

park()会让当前线程进入 waiting状态。在此状态下,有两种途径可以唤醒该线程:1,被 unpark();2,被 interrupt()

看 acquireQueued方法中的源码,如果是因为中断醒来,那么就把中断标记置为 true。

不管是正常被唤醒还是由与中断醒来,都会去尝试获取锁资源。如果成功则返回中断标记,否则继续挂起等待。

Thread.interrupted()方法在返回中断标记的同时会清除中断标记,也就是说当由于中断醒来然后获取锁成功,那么整个 acquireQueued方法就会返回 true

表示是因为中断醒来,但如果中断醒来以后没有获取到锁,继续挂起,由于这次的中断已经被清除了,下次如果是被正常唤醒,那么 acquireQueued方法就会返回 false,表示没有中断。

看了 shouldParkAfterFailedAcquire(Nodepred,Nodenode)和 parkAndCheckInterrupt(),现在让我们再回到 acquireQueued(finalNodenode,intarg),总结下该函数的具体流程:

节点进入队尾后,检查状态,是否可以被挂起去休息;

调用 park进入 waiting状态,等待 unpark()或 interrupt()唤醒自己;

被唤醒后,看自己是不是有资格能拿到号。如果拿到,head指向当前结点,并返回从入队到拿到号的整个过程中是否被中断过;如果没拿到,继续流程1。

cancelAcquire(Node node)
最后我们回到 acquireQueued方法的最后一步, finally模块。这里是针对锁资源获取失败以后做的一些善后工作,翻看上面的代码,其实能进入这里的就是 tryAcquire()方法抛出异常,也就是说AQS框架针对开发人员自己实现的获取锁操作如果抛出异常,也做了妥善的处理,一起来看下源码:

//传入的方法参数是当前获取锁资源失败的节点

private

void
cancelAcquire(
Node
node) {

// 如果节点不存在则直接忽略

if
(node ==
null
)

return
;

    node.thread = 

null
;

// 跳过所有已经取消的前置节点,跟上面的那段跳转逻辑类似

Node
pred = node.prev;

while
(pred.waitStatus >
0
)

        node.prev = pred = pred.prev;

//这个是前置节点的后继节点,由于上面可能的跳节点的操作,所以这里可不一定就是当前节点,仔细想一下。^_^

Node
predNext = pred.next;

//把当前节点waitStatus置为取消,这样别的节点在处理时就会跳过该节点

    node.waitStatus = 

Node
.CANCELLED;

//如果当前是尾节点,则直接删除,即出队

//注:这里不用关心CAS失败,因为即使并发导致失败,该节点也已经被成功删除

if
(node == tail && compareAndSetTail(node, pred)) {

        compareAndSetNext(pred, predNext, 

null
);

    } 

else
{

int
ws;

if
(pred != head &&

            ((ws = pred.waitStatus) == 

Node
.SIGNAL ||

             (ws <= 

0&& compareAndSetWaitStatus(pred, ws,
Node
.SIGNAL))) &&

            pred.thread != 

null
) {

Node
next = node.next;

if
(next !=
null
&& next.waitStatus <=
0
)

//这里的判断逻辑很绕,具体就是如果当前节点的前置节点不是头节点且它后面的节点等待它唤醒(waitStatus小于0),

//再加上如果当前节点的后继节点没有被取消就把前置节点跟后置节点进行连接,相当于删除了当前节点

                compareAndSetNext(pred, predNext, next);

        } 

else
{

//进入这里,要么当前节点的前置节点是头结点,要么前置节点的waitStatus是PROPAGATE,直接唤醒当前节点的后继节点

            unparkSuccessor(node);

        }

        node.next = node; 

// help GC

    }

}

上面就是独占模式获取锁的核心源码,确实非常难懂,很绕,就这几个方法需要反反复复看很多遍,才能慢慢理解。

release(int arg)
接下来看下释放锁的过程:

此方法是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即 state=0),它会唤醒等待队列里的其他线程来获取资源。这也正是 unlock()的语义,当然不仅仅只限于 unlock()。下面是 release()的源码:

public

final

boolean
release(
int
arg) {

if
(tryRelease(arg)) {

Node
h = head;

if
(h !=
null
&& h.waitStatus !=
0
)

            unparkSuccessor(h);

return

true
;

    }

return

false
;

}

tryRelease()方法是用户自定义的释放锁逻辑,如果成功,就判断等待队列中有没有需要被唤醒的节点(waitStatus为0表示没有需要被唤醒的节点),一起看下唤醒操作:

private

void
unparkSuccessor(
Node
node) {

//这里,node一般为当前线程所在的节点。

int
ws = node.waitStatus;

if
(ws <
0
)

//把标记为设置为0,表示唤醒操作已经开始进行,提高并发环境下性能

        compareAndSetWaitStatus(node, ws, 

0);

Node
s = node.next;

//如果当前节点的后继节点为null,或者已经被取消

if
(s ==
null
|| s.waitStatus >
0
) {

        s = 

null
;

//注意这个循环没有break,也就是说它是从后往前找,一直找到离当前节点最近的一个等待唤醒的节点

for
(
Node
t = tail; t !=
null
&& t != node; t = t.prev)

if
(t.waitStatus <=
0
)

                s = t;

    }

//执行唤醒操作

if
(s !=
null
)

LockSupport
.unpark(s.thread);
//唤醒

}

这个函数并不复杂。一句话概括:用 unpark()唤醒等待队列中最前边的那个未放弃线程,这里我们也用 s来表示吧。此时,再和 acquireQueued()联系起来,s被唤醒后,进入 if(p==head&&tryAcquire(arg))的判断(即使 p!=head也没关系,它会再进入 shouldParkAfterFailedAcquire()寻找一个安全点。这里既然 s已经是等待队列中最前边的那个未放弃线程了,那么通过 shouldParkAfterFailedAcquire()的调整, s也必然会跑到 head的 next结点,下一次自旋 p==head就成立啦),然后 s把自己设置成 head标杆结点,表示自己已经获取到资源了, acquire()也返回了!!

三、总结

以上就是AQS独占锁的获取与释放过程,大致思想很简单,就是尝试去获取锁,如果失败就加入一个队列中挂起。释放锁时,如果队列中有等待的线程就进行唤醒。但如果一步一步看源码,会发现细节非常多,很多地方很难搞明白,我自己也是反反复复学习很久才有点心得,但也不敢说已经研究通了AQS,甚至不敢说我上面的研究成果就是对的,只是写篇文章总结一下,跟同行交流交流心得。
除了独占锁,后面还会产出AQS一系列的文章,包括共享锁,条件队列的实现原理等。

目录
相关文章
|
8天前
|
Java
Java中ReentrantLock释放锁代码解析
Java中ReentrantLock释放锁代码解析
25 8
|
8天前
|
Java 调度
Java中常见锁的分类及概念分析
Java中常见锁的分类及概念分析
14 0
|
1天前
|
Java
浅谈Java的synchronized 锁以及synchronized 的锁升级
浅谈Java的synchronized 锁以及synchronized 的锁升级
7 0
|
2天前
|
存储 缓存 安全
Java并发基础之互斥同步、非阻塞同步、指令重排与volatile
在Java中,多线程编程常常涉及到共享数据的访问,这时候就需要考虑线程安全问题。Java提供了多种机制来实现线程安全,其中包括互斥同步(Mutex Synchronization)、非阻塞同步(Non-blocking Synchronization)、以及volatile关键字等。 互斥同步(Mutex Synchronization) 互斥同步是一种基本的同步手段,它要求在任何时刻,只有一个线程可以执行某个方法或某个代码块,其他线程必须等待。Java中的synchronized关键字就是实现互斥同步的常用手段。当一个线程进入一个synchronized方法或代码块时,它需要先获得锁,如果
20 0
|
4天前
|
Java 程序员 编译器
Java中的线程同步与锁优化策略
【4月更文挑战第14天】在多线程编程中,线程同步是确保数据一致性和程序正确性的关键。Java提供了多种机制来实现线程同步,其中最常用的是synchronized关键字和Lock接口。本文将深入探讨Java中的线程同步问题,并分析如何通过锁优化策略提高程序性能。我们将首先介绍线程同步的基本概念,然后详细讨论synchronized和Lock的使用及优缺点,最后探讨一些锁优化技巧,如锁粗化、锁消除和读写锁等。
|
5天前
|
Java 编译器
Java并发编程中的锁优化策略
【4月更文挑战第13天】 在Java并发编程中,锁是一种常见的同步机制,用于保证多个线程之间的数据一致性。然而,不当的锁使用可能导致性能下降,甚至死锁。本文将探讨Java并发编程中的锁优化策略,包括锁粗化、锁消除、锁降级等方法,以提高程序的执行效率。
12 4
|
6天前
|
存储 Java
模式匹配魔法:Java 21中switch语句的巨大进化
模式匹配魔法:Java 21中switch语句的巨大进化
9 0
|
12天前
|
安全 Java 调度
深入理解Java中的线程安全与锁机制
【4月更文挑战第6天】 在并发编程领域,Java语言提供了强大的线程支持和同步机制来确保多线程环境下的数据一致性和线程安全性。本文将深入探讨Java中线程安全的概念、常见的线程安全问题以及如何使用不同的锁机制来解决这些问题。我们将从基本的synchronized关键字开始,到显式锁(如ReentrantLock),再到读写锁(ReadWriteLock)的讨论,并结合实例代码来展示它们在实际开发中的应用。通过本文,读者不仅能够理解线程安全的重要性,还能掌握如何有效地在Java中应用各种锁机制以保障程序的稳定运行。
|
13天前
|
Java 编译器
Java并发编程中的锁优化策略
【4月更文挑战第5天】随着多核处理器的普及,并发编程在提高程序性能方面发挥着越来越重要的作用。在Java中,锁是实现并发控制的关键机制。本文将探讨Java并发编程中的锁优化策略,包括锁粗化、锁消除、锁排序等技术,以提高程序的执行效率和降低资源争用。
|
23天前
|
算法 安全 Java
Java中的并发编程:理解并发性能优化
在当今软件开发领域,多核处理器的普及使得并发编程变得更加重要。本文将深入探讨Java中的并发编程,介绍并发性能优化的关键技术,帮助开发人员更好地利用多核处理器提升应用程序性能。