(juc系列)Phaser源码学习

本文源码基于: JDK13

简介

老规矩,下面是官方注释的简单翻译版本,追求速度,都不一定通顺. 谨慎阅读.

一个可复用的同步屏障,功能上类似于CyclicBarrierCountDownLatch,但是支持更多灵活的用法.

登记

与其他同步屏障不同的是,Phaser的数量是可以各自不同的. 使用方应该使用register或者bulkRegister来进行注册.或者以构造方法的形式初始化数量。 然后在一些节点到达后可以进行取消注册.

与大多数基本的同步器构造方法一样,注册和取消注册仅影响内部计数. 他们不记录任何内部的名单, 任务无法查询他们是否已经登记了.

CountDownLatchCyclicBarrier,Semaphore等等都是指定数量后不能变化的,而Phaser的注册数量是可以随时变化的,因此更加灵活.

同步

CyclicBarrier一样,Phaser支持重复调用awaited. arriveAndAwaitAdvanceCyclicBarrier.await的作用类似.

Phaser的每一代拥有一个关联的编号. Phaser的阶段编号从零开始,所有的参与者到达后,阶段编号增加。到达int的最大值后,回归为0.

阶段编号可以独立的控制到达行为和等待行为,任何注册方可以调用以下两种方法:

  • arrival

arrivearriveAndDeregister两个方法记录到达. 这两个方法不阻塞,但是返回关联的到达阶段编号.

指定阶段编号的最后一个参与者到达,一个可选的行为会被执行,然后Phaser进行升级.

这两个操作由触发阶段升级的最后一个参与者触发,并由重写的onAdvance方法负责控制. 这个方法也负责控制终止,
重写这个方法和CyclicBarrier的屏障行为很相似,但是更加灵活一些.

  • waiting 等待

awaitAdvance要求一个参数,表示到达阶段的编号,或者当一个阶段升级到另一个不同的阶段时返回.
CyclicBarrier的方法不一样,awaitAdvance方法继续等待,直到等待线程被中断. 可中断和带有超时的版本也是支持的. 但是超时或者中断了并不会影响Phaser的状态.

如果必要,你可以自己执行相关的恢复操作, 在调用forceTermination之后. 阶段还被用来执行ForkJoinPool.

终止

一个phaser可以进入终止状态, 使用isTerminated方法来检查. 如果终止了,所有的同步方法立即返回,不再等待. 返回一个负数值来表名这点.

相似的,在终止后尝试进行注册,也不会有反应. 当调用onAdvance返回true时, 终止被触发.
如果一个取消注册的行为,让注册数量为0了, 将会终止.

分层

Phasers可以分层以减少竞争(比如以树状结构初始化). 设置有较大数量的Phasers将会有比较严重的同步竞争,可以使用一组子Phaser共享一个公共的父节点,
来避免这种情况. 这将大大的提升吞吐量即使会导致每一个操作的浪费变大.

在一个分层phaser的树中, 子节点的注册和取消注册是自动管理的, 如果注册的数量变为非零值,子节点将注册至其父节点, 如果注册数量变为0. 子节点将从其父节点取消注册.

可以查看下方分层的示例来了解.

因为支持分层,因此一个Phaser有三种形态.

  • 非树形,单个节点

这是最简单的形态,只要自身的注册数等于到达数,就升级一次阶段编号即可.

  • 树形,叶子节点

只要自身的注册数量等于到达数量,就代表自己这个节点“到达”了,向父节点的到达数+1.

  • 树形,非叶子节点

自身到达数等于注册数,这里的到达数不是参与的任务数,而是自己的子节点的数量,自己的所有子节点全部到达,自己才算到达,向自己的父节点进行”到达”操作。
如果这个节点是根节点,那么整个Phaser树才算是全部到达,进行升级操作.

monitoring 监控

即使同步方法只能由注册方进行调用,一个phaser的当前状态可以被任何调用方监控. 在一个给定的时间,getRegisteredParties返回总数,
getArriveParties返回到达的数量. getUnarrivedParties返回没有到达的数量. 这些方法返回值都是瞬态的,因此可能在同步控制中不是特别有用. toString方法返回这些状态的一个快照.

简单示例

代替CountDownLatch

Phaser可以用来替换掉CountDownLatch. 控制一个行为, 服务于一些部分.
通常的操作是, 设置当前线程为第一个注册者, 然后启动所有的行为,之后取消注册当前线程.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void runTasks(List<Runnable> tasks) { 
// 此时注册数量为1
Phaser startingGate = new Phaser(1); // "1" to register self
// create and start threads
for (Runnable task : tasks) {
startingGate.register();
new Thread(() -> {
startingGate.arriveAndAwaitAdvance();
task.run();
}).start();
}

// deregister self to allow threads to proceed
startingGate.arriveAndDeregister();
}
  1. 注册当前线程(此时注册数量为1)
  2. 启动所有线程,首先注册一次(全部完成后,此时注册数量为tasks.size() + 1) ,之后让他们arriveAndAwaitAdvance. 到达并且等待升级(此时到达数量为tasks.size()`.
  3. 取消注册当前线程(注册数量变成tasks.size()), Phaser的注册数量等于到达数量。因此进行升级,所有等待的线程唤醒,继续执行任务.

重复执行一组任务指定次数

让一组线程,重复执行某些行为一定的次数,可以重写onAdvance.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void startTasks(List<Runnable> tasks, int iterations) {
Phaser phaser = new Phaser() {
// 终止条件, 阶段编号大于等于给定循环次数减1. 其实就是只能循环给定次数
protected boolean onAdvance(int phase, int registeredParties) {
return phase >= iterations - 1 || registeredParties == 0;
}
};
// 注册一个
phaser.register();
for (Runnable task : tasks) {
// 注册`tasks.size()`个
phaser.register();
new Thread(() -> {
do {
task.run();
// 等待升级
phaser.arriveAndAwaitAdvance();
} while (!phaser.isTerminated());
}).start();
}
// allow threads to proceed; don't wait for them
// 取消注册,开始所有任务
phaser.arriveAndDeregister();
}
  1. 初始化一个Phaser,并重写onAdvance. 让阶段编号大于给定次数时,Phaser进行终止.
  2. 当前线程注册. (此时注册数量为1)
  3. 每个任务线程,注册一次. (此时注册数量为tasks.size() + 1)
  4. 如果Phaser没有终止,其他所有线程执行任务, 然后等待 (此时到达数量为tasks.size().
  5. 当前线程取消注册,让注册数等于等待数,其他线程等待结束,进行升级或者终止.

让所有任务互相等待,以完成一组任务,整体完成给定次数后,Phaser终止,程序结束.

等待终止

如果主任务必须在终止后发生,他可以注册然后执行一个相似的循环.

1
2
3
4
// ...
phaser.register();
while (!phaser.isTerminated())
phaser.arriveAndAwaitAdvance();

首先进行注册,然后在Phaser没有终止前,不断的到达,等待升级.知道Phaser终止了,再进行主任务的执行.

等待特定的阶段编号

如果你确定在你的上下文中,Phaser的数量不会超过int的最大值,你可以使用这些相关的构造器来等待特定的某个阶段编号.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void awaitPhase(Phaser phaser, int phase) {
// 注册一次
int p = phaser.register(); // assumes caller not already registered
// 不断等待
while (p < phase) {
if (phaser.isTerminated())
// ... deal with unexpected termination
else
// 阶段升级
p = phaser.arriveAndAwaitAdvance();
}
// 到达指定编号,开始干活
phaser.arriveAndDeregister();
}

分层的示例

上面讲到Phaser支持分层以获得更好的并发性,这是一个简单的例子.

创建一组任务,使用一个树形的Phasers. 假设一个Task的类,他的构造参数接受一个Phaser. 在调用下方代码的build之后,这些任务会开始.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void build(Task[] tasks, int lo, int hi, Phaser ph) {
// 如果任务数量大于单个Phaser最大的任务数,说明需要拆分
if (hi - lo > TASKS_PER_PHASER) {
for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
int j = Math.min(i + TASKS_PER_PHASER, hi);
// 递归调用build, 传入一个新的子Phaser.
build(tasks, i, j, new Phaser(ph));
}
} else {
// 任务数可以由一个Phaser控制
for (int i = lo; i < hi; ++i)
// 创建任务,绑定到当前的Phaser上
tasks[i] = new Task(ph);
// assumes new Task(ph) performs ph.register()
}
}

TASKS_PER_PHASER 的最佳值取决于你期望的同步效率. 越小的值,会让每个阶段的执行块变小,因此速率高. 如果需要更大的执行快,可以设置为高达几百.

注意事项

实现控制最大的参与者数量为65535. 如果尝试去注册更多,会导致错误.
但是你可以通过使用树形的Phasers来实现更多的参与者.

源码阅读

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/**
* Creates a new phaser with no initially registered parties, no
* parent, and initial phase number 0. Any thread using this
* phaser will need to first register for it.
*/
public Phaser() {
this(null, 0);
}

/**
* Creates a new phaser with the given number of registered
* unarrived parties, no parent, and initial phase number 0.
*
* @param parties the number of parties required to advance to the
* next phase
* @throws IllegalArgumentException if parties less than zero
* or greater than the maximum number of parties supported
*/
public Phaser(int parties) {
this(null, parties);
}

/**
* Equivalent to {@link #Phaser(Phaser, int) Phaser(parent, 0)}.
*
* @param parent the parent phaser
*/
public Phaser(Phaser parent) {
this(parent, 0);
}

/**
* Creates a new phaser with the given parent and number of
* registered unarrived parties. When the given parent is non-null
* and the given number of parties is greater than zero, this
* child phaser is registered with its parent.
*
* @param parent the parent phaser
* @param parties the number of parties required to advance to the
* next phase
* @throws IllegalArgumentException if parties less than zero
* or greater than the maximum number of parties supported
*/
public Phaser(Phaser parent, int parties) {
// 高16位有值,异常,初始化时不可以已有参与者
if (parties >>> PARTIES_SHIFT != 0)
throw new IllegalArgumentException("Illegal number of parties");
// 当前阶段置为0.初始值
int phase = 0;
this.parent = parent;
// 给定的父节点不为空, 是一个树形的Phaser.
if (parent != null) {
// 共享同一个root节点,还有所有节点共享队列
final Phaser root = parent.root;
this.root = root;
this.evenQ = root.evenQ;
this.oddQ = root.oddQ;
// 如果参与者不为0,当前节点是一个有效的节点,向当前节点的父节点,注册一个参与者,代表(当前节点需要父节点等待)
if (parties != 0)
phase = parent.doRegister(1);
}
else {
// 父节点为空,当前是孤立的,非树形的Phaser.
// 赋值一些属性
this.root = this;
this.evenQ = new AtomicReference<QNode>();
this.oddQ = new AtomicReference<QNode>();
}
// 初始化状态,如果没有参与者,赋值为EMPTY=1
// 如果有, state=高32位记录阶段号,16-32位记录参与者数量,低16记录没有到达的数量. 初始化的时候,参与者数量和没有到达的数量是一致的
this.state = (parties == 0) ? (long)EMPTY :
((long)phase << PHASE_SHIFT) |
((long)parties << PARTIES_SHIFT) |
((long)parties);
}

共提供了4个构造方法,本质上都是调用最后一个. 详情看注释.

主要是区分是否是树形,然后对State,父节点,等待队列等进行初始化赋值.

变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 内部的状态定义, 核心属性
private volatile long state;

// 一些常量
private static final int MAX_PARTIES = 0xffff; // 最大参与者数量
private static final int MAX_PHASE = Integer.MAX_VALUE; // 最大阶段数量
private static final int PARTIES_SHIFT = 16; // 参与者占用的位数
private static final int PHASE_SHIFT = 32; // 阶段占用的位数
private static final int UNARRIVED_MASK = 0xffff; // to mask ints // 掩码,计算没有到达的数量
private static final long PARTIES_MASK = 0xffff0000L; // to mask longs // 掩码,计算参与者的数量
private static final long COUNTS_MASK = 0xffffffffL; // 掩码,计数
private static final long TERMINATION_BIT = 1L << 63; // 是否终止的bit位

// some special values
private static final int ONE_ARRIVAL = 1; // 到达的Unit值
private static final int ONE_PARTY = 1 << PARTIES_SHIFT; // 一个参与者的unit值
private static final int ONE_DEREGISTER = ONE_ARRIVAL|ONE_PARTY; // 一个注销. 操作等于 “参与者减一,同时未到达树也减一”
private static final int EMPTY = 1; // 空, 参与者为0,未到达为1. 方便辨认

// 当前Phaser的父节点
private final Phaser parent;

/**
* The root of phaser tree. Equals this if not in a tree.
*/
// 当前Phaser的根节点.
private final Phaser root;

/**
* Heads of Treiber stacks for waiting threads. To eliminate
* contention when releasing some threads while adding others, we
* use two of them, alternating across even and odd phases.
* Subphasers share queues with root to speed up releases.
*/
// 等待线程栈的头结点.
// 根据奇偶数使用不同的Phaser.
// 子节点共享相同的两个栈.
private final AtomicReference<QNode> evenQ;
private final AtomicReference<QNode> oddQ;

这是一些变量和常量.

  • State 状态定义
  • parent 父节点
  • root 根节点
  • evenQ 等待线程栈的偶数版本
  • addQ 等待线程栈的奇数版本

其他还有一些常量,主要是用来辅助对于State的定义的,比较常见的一些shift,one等等,不再介绍.

QNode

内部的等待节点. 定义结构比较简单, 主要是保存了当前的Phaser信息和对应的线程信息,以及一个指向下一个节点的next指针.

提供了两个方法.

isReleasable 是否可释放

如果内部的信息有一些不对劲, 比如线程为空,或者被中断了, 或者Phaser被别人改了,等等, 都返回true. 否则返回false. 支持中断和超时.

block 阻塞等待

根据是否超时, 阻塞当前线程一段时间.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
static final class QNode implements ForkJoinPool.ManagedBlocker {
// 等待的阶段
final Phaser phaser;
final int phase;
final boolean interruptible;
final boolean timed;
boolean wasInterrupted;
long nanos;
final long deadline;
volatile Thread thread; // nulled to cancel wait
QNode next;

QNode(Phaser phaser, int phase, boolean interruptible,
boolean timed, long nanos) {
this.phaser = phaser;
this.phase = phase;
this.interruptible = interruptible;
this.nanos = nanos;
this.timed = timed;
this.deadline = timed ? System.nanoTime() + nanos : 0L;
thread = Thread.currentThread();
}

public boolean isReleasable() {
if (thread == null)
return true;
if (phaser.getPhase() != phase) {
thread = null;
return true;
}
if (Thread.interrupted())
wasInterrupted = true;
if (wasInterrupted && interruptible) {
thread = null;
return true;
}
if (timed &&
(nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
thread = null;
return true;
}
return false;
}

public boolean block() {
while (!isReleasable()) {
if (timed)
LockSupport.parkNanos(this, nanos);
else
LockSupport.park(this);
}
return true;
}
}

register系列

用于向Phaser注册参与者.

register系列提供了两个方法,registerbulkRegister两个方法,本质上都是调用doRegister方法.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
private int doRegister(int registrations) {
// adjustment to state
// 注册后的State值.
long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;
// 父节点
final Phaser parent = this.parent;
int phase;
for (;;) {
// 父节点为空,拿到State, 否则重新同步下State.
long s = (parent == null) ? state : reconcileState();
int counts = (int)s;
// 参与者数量
int parties = counts >>> PARTIES_SHIFT;
// 没到达的数量
int unarrived = counts & UNARRIVED_MASK;
// 注册后,参与者数量超出最大值, 报错
if (registrations > MAX_PARTIES - parties)
throw new IllegalStateException(badRegister(s));
// 当前所在的阶段编号
phase = (int)(s >>> PHASE_SHIFT);
// <0,退出
if (phase < 0)
break;
// 不是第一次注册
if (counts != EMPTY) { // not 1st registration
// 没有父节点,或者同步状态已经完成
if (parent == null || reconcileState() == s) {
// 没有未到达的参与者了, 让根节点开始升级
if (unarrived == 0) // wait out advance
root.internalAwaitAdvance(phase, null);
// 有尚未到达的参与者,更新这次到达后的状态,返回
else if (STATE.compareAndSet(this, s, s + adjust))
break;
}
}
// 父节点为空, 但是是第一次注册
else if (parent == null) { // 1st root registration
// 计算下一个状态值且CAS设置
long next = ((long)phase << PHASE_SHIFT) | adjust;
// 直接设置State,返回
if (STATE.compareAndSet(this, s, next))
break;
}
else {
// 是第一次注册,且是树形结构
synchronized (this) { // 1st sub registration
// 检查State
if (state == s) { // recheck under lock
// 把当前节点注册到其父节点上去.
phase = parent.doRegister(1);
if (phase < 0)
break;
// finish registration whenever parent registration
// succeeded, even when racing with termination,
// since these are part of the same "transaction".
// 注册成功, 当前节点的参与者数量等值的设置
while (!STATE.weakCompareAndSet
(this, s,
((long)phase << PHASE_SHIFT) | adjust)) {
s = state;
phase = (int)(root.state >>> PHASE_SHIFT);
// assert (int)s == EMPTY;
}
break;
}
}
}
}
return phase;
}

这是核心的注册方法,主要有三个分支

  • 没有父节点,且第一次注册.

这是最简单的,直接将注册后的State更新进去即可.

  • 有父节点,但是不是第一次注册

检查下注册后,当前节点是否全部到达了,如果是, 当前节点升级,并且告诉父节点.

  • 有父节点,是第一次注册.

首先将当前节点注册到父节点,之后更新当前节点的参与者信息等.

arrive系列

arrivearriveAndDeregister 到达,不等待其他参与者

这两个方法都实现到达相关逻辑, 调用doArrive来实现.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

/**
* Arrives at this phaser, without waiting for others to arrive.
*
* <p>It is a usage error for an unregistered party to invoke this
* method. However, this error may result in an {@code
* IllegalStateException} only upon some subsequent operation on
* this phaser, if ever.
*
* @return the arrival phase number, or a negative value if terminated
* @throws IllegalStateException if not terminated and the number
* of unarrived parties would become negative
*/
public int arrive() {
return doArrive(ONE_ARRIVAL);
}

/**
* Arrives at this phaser and deregisters from it without waiting
* for others to arrive. Deregistration reduces the number of
* parties required to advance in future phases. If this phaser
* has a parent, and deregistration causes this phaser to have
* zero parties, this phaser is also deregistered from its parent.
*
* <p>It is a usage error for an unregistered party to invoke this
* method. However, this error may result in an {@code
* IllegalStateException} only upon some subsequent operation on
* this phaser, if ever.
*
* @return the arrival phase number, or a negative value if terminated
* @throws IllegalStateException if not terminated and the number
* of registered or unarrived parties would become negative
*/
public int arriveAndDeregister() {
return doArrive(ONE_DEREGISTER);
}

doArrive

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68

/**
* Main implementation for methods arrive and arriveAndDeregister.
* Manually tuned to speed up and minimize race windows for the
* common case of just decrementing unarrived field.
*
* @param adjust value to subtract from state;
* ONE_ARRIVAL for arrive,
* ONE_DEREGISTER for arriveAndDeregister
*/
// arrive和arriveAAndDeregister两个方法的主要实现.
private int doArrive(int adjust) {
final Phaser root = this.root;
for (;;) {
// 如果不是树形结构,拿到State,如果是,进行一次状态同步后拿到State.
long s = (root == this) ? state : reconcileState();
// 当前阶段
int phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0) // 阶段小于0,直接退出
return phase;
int counts = (int)s; // 这个s是高16位是参与者数量,低16位是未到达数量, 整合的一个数字
// 没有到达的数量
int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
if (unarrived <= 0)
throw new IllegalStateException(badArrive(s));
// 到达成功, 更改State成功.
if (STATE.compareAndSet(this, s, s-=adjust)) {
// 当前是最后一个到达的
if (unarrived == 1) {
// 参数者数量
long n = s & PARTIES_MASK; // base of next state
// 下一个的未到达数量, 当前是最后一个,下一个其实是0
int nextUnarrived = (int)n >>> PARTIES_SHIFT;

// 不是树形结构
if (root == this) {
// 如果需要终止, 就终止
if (onAdvance(phase, nextUnarrived))
n |= TERMINATION_BIT;
// 如果没有参与者,也没有到达的,Phaser置为空.
else if (nextUnarrived == 0)
n |= EMPTY;
else
// 升级后应该的n
n |= nextUnarrived;
// 下一个阶段编号
int nextPhase = (phase + 1) & MAX_PHASE;
// 计算新的State并写入
n |= (long)nextPhase << PHASE_SHIFT;
STATE.compareAndSet(this, s, n);
// 释放等待的节点
releaseWaiters(phase);
}
// 树形结构, 且当前节点全倒了,父节点进行到达行为, 然后当前节点跟进状态
else if (nextUnarrived == 0) { // propagate deregistration
phase = parent.doArrive(ONE_DEREGISTER);
STATE.compareAndSet(this, s, s | EMPTY);
}
else
// 父节点到达
phase = parent.doArrive(ONE_ARRIVAL);
}
// 如果不是最后一个到达的, 直接返回就好了
return phase;
}
}
}

主要作用是对State中的未到达数量进行递减, 如果递减完,还有未到达的参与者,直接返回当前阶段,如果递减完,当前所有参与者都到达了.
有三个分支:

  • 非树形结构

直接计算下一个状态,进行写入.

  • 树形结构,且因为注销,没有参与者了.

向父节点注销当前节点, 当前节点置为空.

  • 树形结构,且还有参与者

向父节点传递当前节点完全到达的消息.

arriveAndAwaitAdvance 到达然后等待其他参与者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

/**
* Arrives at this phaser and awaits others. Equivalent in effect
* to {@code awaitAdvance(arrive())}. If you need to await with
* interruption or timeout, you can arrange this with an analogous
* construction using one of the other forms of the {@code
* awaitAdvance} method. If instead you need to deregister upon
* arrival, use {@code awaitAdvance(arriveAndDeregister())}.
*
* <p>It is a usage error for an unregistered party to invoke this
* method. However, this error may result in an {@code
* IllegalStateException} only upon some subsequent operation on
* this phaser, if ever.
*
* @return the arrival phase number, or the (negative)
* {@linkplain #getPhase() current phase} if terminated
* @throws IllegalStateException if not terminated and the number
* of unarrived parties would become negative
*/
// 到达并等待其他参与者,等价于调用`awaitAdvance(arrive())`.
public int arriveAndAwaitAdvance() {
// Specialization of doArrive+awaitAdvance eliminating some reads/paths
final Phaser root = this.root;
for (;;) {
// 这块和之前逻辑一样
long s = (root == this) ? state : reconcileState();
int phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
int counts = (int)s;
int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
if (unarrived <= 0)
throw new IllegalStateException(badArrive(s));
// 到达一个更新State成功
if (STATE.compareAndSet(this, s, s -= ONE_ARRIVAL)) {
// 如果当前不是最后一个到达的参与者,阻塞等待
if (unarrived > 1)
return root.internalAwaitAdvance(phase, null);
// 如果当前节点是最后一个到达的参与者,向父节点进行“到达且等待”操作
if (root != this)
return parent.arriveAndAwaitAdvance();
// 这里是,当前节点是最后一个到达的参与者,且当前节点不是树形结构
// 计算新的State并设置State
long n = s & PARTIES_MASK; // base of next state
int nextUnarrived = (int)n >>> PARTIES_SHIFT;
if (onAdvance(phase, nextUnarrived))
n |= TERMINATION_BIT;
else if (nextUnarrived == 0)
n |= EMPTY;
else
n |= nextUnarrived;
int nextPhase = (phase + 1) & MAX_PHASE;
n |= (long)nextPhase << PHASE_SHIFT;
if (!STATE.compareAndSet(this, s, n))
return (int)(state >>> PHASE_SHIFT); // terminated
// 唤醒等待者
releaseWaiters(phase);
return nextPhase;
}
}
}

到达一个参与者,且阻塞等待Phaser的升级行为. 首先将当前Phaser的状态进行递减,之后主要有三个分支:

  • 不是最后一个到达的.

从跟进点进行等待升级

  • 是最后一个到达的,且有父节点

调用父节点的arriveAndAwaitAdvance,向父节点报告当前节点已经完全到达,开始等待升级.

  • 是最后一个到达的,且是根节点

计算新的状态,设置状态然后唤醒等待者.

await系列

awaitAdvance, awaitAdvanceInterruptibly, awaitAdvanceInterruptibly 三个await系列的方法,本质上都是调用的
父节点的internalAwaitAdvance. 只是支持了中断和超时而已.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
/**
* Awaits the phase of this phaser to advance from the given phase
* value, returning immediately if the current phase is not equal
* to the given phase value or this phaser is terminated.
*
* @param phase an arrival phase number, or negative value if
* terminated; this argument is normally the value returned by a
* previous call to {@code arrive} or {@code arriveAndDeregister}.
* @return the next arrival phase number, or the argument if it is
* negative, or the (negative) {@linkplain #getPhase() current phase}
* if terminated
*/
// 等待阶段升级
// 如果给定的阶段和当前不一致,或者当前Phaser终止了,直接返回.
public int awaitAdvance(int phase) {
final Phaser root = this.root;
// 拿到Staate.
long s = (root == this) ? state : reconcileState();
// 当前的阶段
int p = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
// 如果一样,等待父节点升级,
if (p == phase)
return root.internalAwaitAdvance(phase, null);
return p;
}



/**
* Possibly blocks and waits for phase to advance unless aborted.
* Call only on root phaser.
*
* @param phase current phase
* @param node if non-null, the wait node to track interrupt and timeout;
* if null, denotes noninterruptible wait
* @return current phase
*/
// 只有根节点调用,可能会阻塞
private int internalAwaitAdvance(int phase, QNode node) {
// assert root == this;
// 奇偶队列交替使用,确保旧的是空的
releaseWaiters(phase-1); // ensure old queue clean
boolean queued = false; // true when node is enqueued
int lastUnarrived = 0; // to increase spins upon change
int spins = SPINS_PER_ARRIVAL;
long s;
int p;
// 当给定的阶段和当前阶段一致时
while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
// 节点为空,
if (node == null) { // spinning in noninterruptible mode
// 计算自旋次数
int unarrived = (int)s & UNARRIVED_MASK;
if (unarrived != lastUnarrived &&
(lastUnarrived = unarrived) < NCPU)
spins += SPINS_PER_ARRIVAL;
boolean interrupted = Thread.interrupted();
// 自旋为0或者线程中断了,创建一个节点
if (interrupted || --spins < 0) { // need node to record intr
node = new QNode(this, phase, false, false, 0L);
node.wasInterrupted = interrupted;
}
else
// 休眠等待
Thread.onSpinWait();
}
else if (node.isReleasable()) // done or aborted
break;
else if (!queued) { // push onto queue
// 入队
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
QNode q = node.next = head.get();
if ((q == null || q.phase == phase) &&
(int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
queued = head.compareAndSet(q, node);
}
else {
try {
ForkJoinPool.managedBlock(node);
} catch (InterruptedException cantHappen) {
node.wasInterrupted = true;
}
}
}

if (node != null) {
if (node.thread != null)
node.thread = null; // avoid need for unpark()
if (node.wasInterrupted && !node.interruptible)
Thread.currentThread().interrupt();
if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
return abortWait(phase); // possibly clean up on abort
}
releaseWaiters(phase);
return p;
}

让当前线程自旋或者进入队列等待Phaser的升级,也就是等待其他所有参与者的到达.

强行终止

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* Forces this phaser to enter termination state. Counts of
* registered parties are unaffected. If this phaser is a member
* of a tiered set of phasers, then all of the phasers in the set
* are terminated. If this phaser is already terminated, this
* method has no effect. This method may be useful for
* coordinating recovery after one or more tasks encounter
* unexpected exceptions.
*/
public void forceTermination() {
// Only need to change root state
final Phaser root = this.root;
long s;
while ((s = root.state) >= 0) {
if (STATE.compareAndSet(root, s, s | TERMINATION_BIT)) {
// signal all threads
releaseWaiters(0); // Waiters on evenQ
releaseWaiters(1); // Waiters on oddQ
return;
}
}
}

比较简单, 只要根节点的状态不为0,就强行设置为终止了. 释放所有的等待节点.

onAdvance

这是预留给子类的一个方法, 可以定义Phaser升级时执行的动作,还可以定义锁是否要升级. 默认实现是注册的参与者为0, 就终止整个Phaser.

监控方法

还有很多负责监控当前Phaser状态 的方法,这里简单记录一下 .

  • getPhase 拿到阶段编号
  • getRegisteredParites 拿到当前的参与者数量
  • getArrivedParties 拿到当前到达的参与者数量
  • getUnarrivedParties 未到达的参与者数量
  • getParent 返回当前节点的父节点
  • getRoot 获取根节点
  • isTerminated 是否被终止

总结

Phaser是一个用于多阶段任务的同步器,没有使用AQS框架来实现,而是自己实现的。

内部的核心还是State的定义.

高32位记录当前的阶段编号,16-32为记录共有多少个参与者, 低16位记录还有多少个参与者没有到达.

提供三类方法:

  • 注册

修改16-32位,与其他同步器相比,提供了更多的灵活性,可以修改参与者的数量

  • 到达

修改低16位,当全部到达后,进行升级,升级通过修改高32位来记录阶段编号

  • 等待

让先到达的线程,阻塞等待所有参与者的到达,也就是升级行为完成后,被唤醒.

为了支持更大的并发度,Phaser支持以树结构创建,叶子节点接受所有参与者的到达,控制所有注册到自己的参与者. 父节点控制自己的子节点.
根节点控制所有是否进行放行,唤醒所有等待线程.


完.


完。





联系我

最后,欢迎关注我的个人公众号【 呼延十 】,会不定期更新很多后端工程师的学习笔记。
也欢迎直接公众号私信或者邮箱联系我,一定知无不言,言无不尽。



以上皆为个人所思所得,如有错误欢迎评论区指正。

欢迎转载,烦请署名并保留原文链接。

联系邮箱:huyanshi2580@gmail.com

更多学习笔记见个人博客或关注微信公众号 <呼延十 >——>呼延十