(juc系列)Condition条件队列源码解析

本文源码基于: JDK13

Condition

官方注释翻译

ConditionObject身上的方法wait,notify,nitofyAll等分解为不同的对象,通过将他们与任意的锁实现结合,实现了每个对象有多个等待集的效果. 锁代替了同步方法和语句,条件代替了对象对象监视器方法的使用.

Conditions(或者称为条件队列或者条件变量) 提供一个方法,让一个线程暂停wait,直到被其他线程通知,说某个等待条件可能为真. 因为共享状态的
访问在不同的线程中,因此必须保护它,与某种形式的锁相关联. 等待条件提供的关键属性是: 他原子性的释放关联的锁并挂起当前的线程,就像Object.wait().

Condition本质上是绑定到锁上的,要获取给定锁的条件实例,请调用locl.newCondition方法.

比如,假设我们有一个支持puttake的有界缓冲区. 如果对一个空的缓冲区进行take操作,线程将会阻塞,知道有元素可用. 如果对一个满的缓冲去进行put操作,线程会阻塞直到缓冲区有空间,也就是阻塞队列的语义.

我们希望在单独的等待集中保持生产者和消费者的线程,这样我们可以在缓冲区有空间或者缓冲区不为空时,只唤醒一部分线程. 这可以使用两个Condition实例来实现.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class BoundedBuffer<E> {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();

final Object[] items = new Object[100];
int putptr, takeptr, count;

public void put(E x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}

public E take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
E x = (E) items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}

如果你看过BlockingQueue相关的代码, 就会发现上面的代码兼职太眼熟了我的天.

Condition的实现类可以提供与Object的监控方法不同的行为和语义,比如保证通知的顺序,或者在执行通知时不需要持有锁. 如果实现提供了这样专门
的语义,那必须记录下来.

注意: Condition对象只是普通的对象,他们可以用做同步语句中的目标,并且可以调用他们自己的等待和通知方法,获取Condition实例的监视器所,或者使用他的监视器方法,与获取与Condition关联的锁之间没有什么关系.
为了避免混淆,建议永远不要这么高.

除非特别说明,否则传递任何null都会导致NPE。

接口

  • await 等待
  • awaitUninterruptibly 不可中断的等待
  • awaitNanos 等待指定毫秒
  • await(time,unit) 等待指定时间
  • awaitUntil 等待知道deakline到来
  • signal 通知一个等待线程
  • signalAll 通知全部等待线程

AQS中的ConditionObject

定义

1
2

public class ConditionObject implements Condition, java.io.Serializable {

朴实无华,一个条件.

属性

1
2
3
4

private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

等待的可能有多个线程,因此总是需要一个队列来保存等待者的,这里使用链表实现,保存了链表的首和尾.

Node节点保存了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;

static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

volatile int waitStatus;

volatile Node prev;

volatile Node next;

volatile Thread thread;

Node nextWaiter;

保存了等待的状态,以及前后节点,还有当前节点的线程.

构造方法

1
2
3

public ConditionObject() { }

创建一个空的条件队列.

await 系列

既然实现了Condition接口,就按照接口的方法来看. 由于有多个关于时间控制的等待方法,为了避免冗余,我们只看一下await(time,unit)方法,比较有代表性.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
// 计算纳秒时间
long nanosTimeout = unit.toNanos(time);
// 中断返回
if (Thread.interrupted())
throw new InterruptedException();
// We don't check for nanosTimeout <= 0L here, to allow
// await(0, unit) as a way to "yield the lock".
// 计算结束时间
final long deadline = System.nanoTime() + nanosTimeout;
// 将当前节点添加到等待队列中
Node node = addConditionWaiter();

int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
// 状态ok就自旋
while (!isOnSyncQueue(node)) {
// 时间到了,结束
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
// 休眠给定时间
if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
LockSupport.parkNanos(this, nanosTimeout);
// 是否中断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
// 状态OK?是否中断了
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 如果当前节点还有后续节点,解除一些取消掉的节点
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
// 返回结果
return !timedout;
}

其实这种方法很难讲,代码写的很复杂,但是核心其实就一个LockSupport.park(),完事. 根据需要将当前线程休眠指定时间.

signal 唤醒系列

以signal为例,因为唤醒单个会了,唤醒全部大不了for循环~.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}

private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}


final boolean transferForSignal(Node node) {
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
return false;

Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

从队头找到第一个等待线程,验证当前状态之后,进行唤醒.

ReentrantLock 中的Condition

其实ReentrantLock是使用AQS实现的,为啥还要单独看呢?

因为在学习BlockingQueue时,对于两个条件分别控制生产者等待和消费者等待印象深刻,而ArrayListBlockingQueue是使用ReentrantLock实现的,因此单独看一下。

ReentrantLock中,初始化一个Condition,使用:

1
2
3
4

final ConditionObject newCondition() {
return new ConditionObject();
}

朴实无华,直接使用了AQS的条件队列.

总结

Condition定义了一个接口,允许线程在它的实例上阻塞,互相唤醒.

我们已经有了Object提供了相关方法,为啥还需要Condition呢?

就是ArrayListBlockingQueue的情况了,Object对象,只允许所有的线程因为同样的原因阻塞.

而我们需要不同的线程群根据不同的条件阻塞,条件满足时,部分唤醒. 因此需要Condition

同时,有了Condition,我们还可以自定义很多逻辑,比如线程的唤醒顺序,或者添加更多自定义的hook方法等等,更加灵活.

AQSCondition为所有基于AQS实现的类,提供了默认的ConditionObject.

他内部使用链表来保存等待线程,使用CAS来保证更新的原子性. 因为在ArrayListBlockingQueue中,使用两个条件队列才能那么丝滑.

参考文章


完。





联系我

最后,欢迎关注我的个人公众号【 呼延十 】,会不定期更新很多后端工程师的学习笔记。
也欢迎直接公众号私信或者邮箱联系我,一定知无不言,言无不尽。



以上皆为个人所思所得,如有错误欢迎评论区指正。

欢迎转载,烦请署名并保留原文链接。

联系邮箱:huyanshi2580@gmail.com

更多学习笔记见个人博客或关注微信公众号 <呼延十 >——>呼延十