(juc系列)AQS源码学习笔记

前言

本文源码基于: JDK13

JUC是Java提供的一个并发工具包,提供了很多并发工具.

本文主要将AQS.

java.util.concurrent.locks.AbstractQueuedSynchronizer.

是一个基类,也可以理解为一个框架.

它提供了对于同步状态的控制,以前线程等待时的FIFO队列.

Fields

AQS的属性.

state

1
2
3
4
/**
* The synchronization state.
*/
private volatile int state;

核心属性,同步状态. 使用volatile修饰.

与之对应的三个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* Returns the current value of synchronization state.
* This operation has memory semantics of a {@code volatile} read.
* @return current state value
*/
protected final int getState(){
return state;
}

/**
* Sets the value of synchronization state.
* This operation has memory semantics of a {@code volatile} write.
* @param newState the new state value
*/
protected final void setState(int newState){
state=newState;
}

/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a {@code volatile} read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect,int update){
return STATE.compareAndSet(this,expect,update);
}

分别提供了get/set方法及CAS的赋值方法.

等待队列队头.

tail

等待队列队尾.

head 和 tail 是java.util.concurrent.locks.AbstractQueuedSynchronizer.Node
的实例, 构成了一个双向链表.

Node

Node是为了表达一个等待线程而抽象的数据结构,主要有以下几个属性.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

// Node节点所在的等待状态
volatile int waitStatus;

//前置节点
volatile Node prev;

// 后置节点
volatile Node next;

// 在这个节点上的线程
volatile Thread thread;

// 下一个等待的节点
Node nextWaiter;

他有两种模式,分别为共享模式及独占模式. 对应不同的操作.

其中waitStatus为枚举值,有以下几个值.

1
2
3
4
5
6
7
8
9
10
11
/** waitStatus value to indicate thread has cancelled. */
static final int CANCELLED=1;
/** waitStatus value to indicate successor's thread needs unparking. */
static final int SIGNAL=-1;
/** waitStatus value to indicate thread is waiting on condition. */
static final int CONDITION=-2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate.
*/
static final int PROPAGATE=-3;

Public-Methods

AQS的方法可太多了. 先看一下对外提供的API方法.

众所周知,AQS是为了同步(加锁)而设计的. 那么一定是有获取锁,释放锁的方法的.先从这里切入.

acquire(int arg)

独占式应用,典型的就是ReentrantLock. ReentrantLock源码学习

独占模式的加锁代码.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
    /**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg){
if(!tryAcquire(arg)&&
acquireQueued(addWaiter(Node.EXCLUSIVE),arg))
selfInterrupt();
}

独占模式的获取锁, 并且忽略中断. 至少调用一次tryAcquire.如果成功了就返回.

否则的话将线程加入等待队列,重复的进行tryAcquire. 直到成功为止.

traAcquire(int arg)

这个方法在AQS中是抽象的, protected修饰. 由子类具体进行实现.

它定义的:

独占模式的获取锁, 如果可以获取到,返回成功,如果获取失败,线程应该被放入等待队列.

如果线程已经在等待队列中, 应该是被其他线程唤醒了.

总之: 这个方法是非阻塞的,立即返回的,要么成功加锁,返回true. 要么加锁失败,返回flase. ,之后的操作就不归这个方法管了.

addWaiter(Node node)

private方法,给当前线程创建一个Node并且放入等待队列.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
    /**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode){
Node node=new Node(mode);

for(;;){
Node oldTail=tail;
if(oldTail!=null){
node.setPrevRelaxed(oldTail);
if(compareAndSetTail(oldTail,node)){
oldTail.next=node;
return node;
}
}else{
initializeSyncQueue();
}
}
}
  1. 创建一个Node.
  2. 如果队尾为空,说明等待队列没有初始化,进行初始化.
  3. 将当前节点设置为新的队尾.

acquireQueued(Node node, int arg)

一个final方法,子类无法重写.

将等待队列中的所有线程,进行获取锁的行为.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
    /**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node,int arg){
boolean interrupted=false;
try{
for(;;){
final Node p=node.predecessor();
if(p==head&&tryAcquire(arg)){
setHead(node);
p.next=null; // help GC
return interrupted;
}
if(shouldParkAfterFailedAcquire(p,node))
interrupted|=parkAndCheckInterrupt();
}
}catch(Throwable t){
cancelAcquire(node);
if(interrupted)
selfInterrupt();
throw t;
}
}

如果当前节点的前置节点是头结点,说明当前节点是优先级最高的那个.尝试获取锁.

如果当前节点不是优先级最高的,或者获取锁失败了. 调用shouldParkAfterFailedAcquire.

shouldParkAfterFailedAcquire
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred,Node node){
int ws=pred.waitStatus;
if(ws==Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if(ws>0){
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do{
node.prev=pred=pred.prev;
}while(pred.waitStatus>0);
pred.next=node;
}else{
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
pred.compareAndSetWaitStatus(ws,Node.SIGNAL);
}
return false;
}

如果前置节点是SIGNAL.说明前置节点优先级更高,当前线程应该park.

如果前置节点被取消了,扔掉中间的取消节点. 不park.

如果前置节点是其他状态,设置为SIGNAL. 优先级最高. 不park.

不park的原因是再来一次. 检测一遍.

如果当前线程需要被park.则park且检查下是否中断了.

parkAndCheckInterrupt
1
2
3
4
5
6
7
8
9
/**
* Convenience method to park and then check if interrupted.
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt(){
LockSupport.park(this);
return Thread.interrupted();
}

如果发生异常,则取消掉这次获取锁.

cancelAcquire(Node node)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
    /**
* Cancels an ongoing attempt to acquire.
*
* @param node the node
*/
private void cancelAcquire(Node node){
// Ignore if node doesn't exist
if(node==null)
return;

node.thread=null;

// Skip cancelled predecessors
Node pred=node.prev;
while(pred.waitStatus>0)
node.prev=pred=pred.prev;

// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary, although with
// a possibility that a cancelled node may transiently remain
// reachable.
Node predNext=pred.next;

// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus=Node.CANCELLED;

// If we are the tail, remove ourselves.
if(node==tail&&compareAndSetTail(node,pred)){
pred.compareAndSetNext(predNext,null);
}else{
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if(pred!=head&&
((ws=pred.waitStatus)==Node.SIGNAL||
(ws<=0&&pred.compareAndSetWaitStatus(ws,Node.SIGNAL)))&&
pred.thread!=null){
Node next=node.next;
if(next!=null&&next.waitStatus<=0)
pred.compareAndSetNext(predNext,next);
}else{
unparkSuccessor(node);
}

node.next=node; // help GC
}
}
  1. 当前node的thread设置为null.
  2. 扔掉当前节点之前的所有被取消了的节点.
  3. 取消掉当前节点。
  4. 设置尾节点为前一个节点.

release(int arg)

独占式的解锁.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
    /**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg){
if(tryRelease(arg)){
Node h=head;
if(h!=null&&h.waitStatus!=0)
unparkSuccessor(h);
return true;
}
return false;
}

调用tryRelease(int arg). 如果解锁成功,唤醒头结点的后继节点. 如果解锁失败, 返回false.

tryRelease(int arg)

解锁操作,由子类负责具体实现,可以后期针对ReentrantLock学习.

这个方法,非阻塞式, 即时返回true/false. 代表是否释放成功.

unparkSuccessor(Node node)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
    /**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node){
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws=node.waitStatus;
if(ws< 0)
node.compareAndSetWaitStatus(ws,0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s=node.next;
if(s==null||s.waitStatus>0){
s=null;
for(Node p=tail;p!=node&&p!=null;p=p.prev)
if(p.waitStatus<=0)
s=p;
}
if(s!=null)
LockSupport.unpark(s.thread);
}

在等待队列中,从后向前找到正序的第一个需要唤醒的Node. 执行unpark操作.

acquireShared(int arg)

共享锁的相关实现,可以查看CountDownLatch的相关代码. CountDownLatch源码解析

共享模式的获取锁.忽略中断.

至少调用一次TryAcquireShared, 如果成功就返回,失败就将线程加入等待队列. 重复调用TryAcquireShared知道成功.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
    /**
* Acquires in shared mode, ignoring interrupts. Implemented by
* first invoking at least once {@link #tryAcquireShared},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquireShared} until success.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
*/
public final void acquireShared(int arg){
if(tryAcquireShared(arg)< 0)
doAcquireShared(arg);
}

tryAcquireShared(int arg)

抽象方法,由子类负责实现.

如果获取锁成功,直接返回. 如果获取失败,线程加入等待队列,如果线程已经加入,等待被其他人释放锁的动作唤醒.

doAcquireShared(int arg)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
    /**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg){
final Node node=addWaiter(Node.SHARED);
boolean interrupted=false;
try{
for(;;){
final Node p=node.predecessor();
if(p==head){
int r=tryAcquireShared(arg);
if(r>=0){
setHeadAndPropagate(node,r);
p.next=null; // help GC
return;
}
}
if(shouldParkAfterFailedAcquire(p,node))
interrupted|=parkAndCheckInterrupt();
}
}catch(Throwable t){
cancelAcquire(node);
throw t;
}finally{
if(interrupted)
selfInterrupt();
}
}
  1. 首先添加一个SHARED模式的节点到等待队列.
  2. 如果当前节点的前置节点是head. 说明当前节点的优先级最高,尝试获取锁. 如果成功,则返回.
  3. 如果当前节点不是优先级最高的,或者获取锁失败了,调用shouldParkAfterFailedAcquire判断是否需要进行park. 如果需要,则park当前线程并检查中断.
  4. 如果不需要park.则自旋. 进行下一次判断,是否需要获取锁.
  5. 如果catch异常,则取消这次获取锁,流程同上面独占模式取消.

releaseShared(int arg)

共享模式的释放锁.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
    /**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg){
if(tryReleaseShared(arg)){
doReleaseShared();
return true;
}
return false;
}

非阻塞式的释放锁.调用tryReleaseShared.

如果释放成功,调用doReleaseShared.如果失败,返回false.

tryReleaseShared(int arg)

抽象方法,具体由子类进行实现.

非阻塞式的,返回释放的结果.

doReleaseShared()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
    /**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared(){
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for(;;){
Node h=head;
if(h!=null&&h!=tail){
int ws=h.waitStatus;
if(ws==Node.SIGNAL){
if(!h.compareAndSetWaitStatus(Node.SIGNAL,0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if(ws==0&&
!h.compareAndSetWaitStatus(0,Node.PROPAGATE))
continue; // loop on failed CAS
}
if(h==head) // loop if head changed
break;
}
}

共享模式的释放锁操作. 通知后继者并且确保传播.

独占式的解锁,只需要唤醒下一个即可。而共享式的解锁,需要广播解锁消息.

遍历等待队列,将SIGNAL的节点继任者全部唤醒.

完.

参考文章


完。





联系我

最后,欢迎关注我的个人公众号【 呼延十 】,会不定期更新很多后端工程师的学习笔记。 也欢迎直接公众号私信或者邮箱联系我,一定知无不言,言无不尽。



以上皆为个人所思所得,如有错误欢迎评论区指正。

欢迎转载,烦请署名并保留原文链接。

联系邮箱:huyanshi2580@gmail.com

更多学习笔记见个人博客或关注微信公众号 <呼延十 >——>呼延十