本文源码基于: JDK13
Condition
官方注释翻译
Condition
将Object
身上的方法wait,notify,nitofyAll
等分解为不同的对象,通过将他们与任意的锁实现结合,实现了每个对象有多个等待集的效果. 锁代替了同步方法和语句,条件代替了对象对象监视器方法的使用.
Conditions
(或者称为条件队列或者条件变量) 提供一个方法,让一个线程暂停wait
,直到被其他线程通知,说某个等待条件可能为真. 因为共享状态的
访问在不同的线程中,因此必须保护它,与某种形式的锁相关联. 等待条件提供的关键属性是: 他原子性的释放关联的锁并挂起当前的线程,就像Object.wait()
.
Condition
本质上是绑定到锁上的,要获取给定锁的条件实例,请调用locl.newCondition
方法.
比如,假设我们有一个支持put
和take
的有界缓冲区. 如果对一个空的缓冲区进行take
操作,线程将会阻塞,知道有元素可用. 如果对一个满的缓冲去进行put
操作,线程会阻塞直到缓冲区有空间,也就是阻塞队列的语义.
我们希望在单独的等待集中保持生产者和消费者的线程,这样我们可以在缓冲区有空间或者缓冲区不为空时,只唤醒一部分线程. 这可以使用两个Condition
实例来实现.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| class BoundedBuffer<E> { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putptr, takeptr, count; public void put(E x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); } finally { lock.unlock(); } } public E take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); E x = (E) items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); return x; } finally { lock.unlock(); } } }
|
如果你看过BlockingQueue
相关的代码, 就会发现上面的代码兼职太眼熟了我的天.
Condition
的实现类可以提供与Object
的监控方法不同的行为和语义,比如保证通知的顺序,或者在执行通知时不需要持有锁. 如果实现提供了这样专门
的语义,那必须记录下来.
注意: Condition
对象只是普通的对象,他们可以用做同步语句中的目标,并且可以调用他们自己的等待和通知方法,获取Condition
实例的监视器所,或者使用他的监视器方法,与获取与Condition
关联的锁之间没有什么关系.
为了避免混淆,建议永远不要这么高.
除非特别说明,否则传递任何null都会导致NPE。
接口
- await 等待
- awaitUninterruptibly 不可中断的等待
- awaitNanos 等待指定毫秒
- await(time,unit) 等待指定时间
- awaitUntil 等待知道deakline到来
- signal 通知一个等待线程
- signalAll 通知全部等待线程
AQS中的ConditionObject
定义
1 2
| public class ConditionObject implements Condition, java.io.Serializable {
|
朴实无华,一个条件.
属性
1 2 3 4
| private transient Node firstWaiter;
private transient Node lastWaiter;
|
等待的可能有多个线程,因此总是需要一个队列来保存等待者的,这里使用链表实现,保存了链表的首和尾.
Node节点保存了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null;
static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
|
保存了等待的状态,以及前后节点,还有当前节点的线程.
构造方法
1 2 3
| public ConditionObject() { }
|
创建一个空的条件队列.
await 系列
既然实现了Condition
接口,就按照接口的方法来看. 由于有多个关于时间控制的等待方法,为了避免冗余,我们只看一下await(time,unit)
方法,比较有代表性.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| public final boolean await(long time, TimeUnit unit) throws InterruptedException { long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) throw new InterruptedException(); final long deadline = System.nanoTime() + nanosTimeout; Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { timedout = transferAfterCancelledWait(node); break; } if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; }
|
其实这种方法很难讲,代码写的很复杂,但是核心其实就一个LockSupport.park()
,完事. 根据需要将当前线程休眠指定时间.
signal 唤醒系列
以signal为例,因为唤醒单个会了,唤醒全部大不了for循环~.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
final boolean transferForSignal(Node node) { if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) return false;
Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
|
从队头找到第一个等待线程,验证当前状态之后,进行唤醒.
ReentrantLock 中的Condition
其实ReentrantLock是使用AQS
实现的,为啥还要单独看呢?
因为在学习BlockingQueue
时,对于两个条件分别控制生产者等待和消费者等待印象深刻,而ArrayListBlockingQueue
是使用ReentrantLock
实现的,因此单独看一下。
在ReentrantLock
中,初始化一个Condition
,使用:
1 2 3 4
| final ConditionObject newCondition() { return new ConditionObject(); }
|
朴实无华,直接使用了AQS
的条件队列.
总结
Condition
定义了一个接口,允许线程在它的实例上阻塞,互相唤醒.
我们已经有了Object
提供了相关方法,为啥还需要Condition
呢?
就是ArrayListBlockingQueue
的情况了,Object
对象,只允许所有的线程因为同样的原因阻塞.
而我们需要不同的线程群根据不同的条件阻塞,条件满足时,部分唤醒. 因此需要Condition
。
同时,有了Condition
,我们还可以自定义很多逻辑,比如线程的唤醒顺序,或者添加更多自定义的hook方法等等,更加灵活.
AQS
中Condition
为所有基于AQS实现的类,提供了默认的ConditionObject
.
他内部使用链表来保存等待线程,使用CAS来保证更新的原子性. 因为在ArrayListBlockingQueue
中,使用两个条件队列才能那么丝滑.
参考文章
完。
联系我
最后,欢迎关注我的个人公众号【 呼延十 】,会不定期更新很多后端工程师的学习笔记。
也欢迎直接公众号私信或者邮箱联系我,一定知无不言,言无不尽。
以上皆为个人所思所得,如有错误欢迎评论区指正。
欢迎转载,烦请署名并保留原文链接。
联系邮箱:huyanshi2580@gmail.com
更多学习笔记见个人博客或关注微信公众号 <呼延十 >——>呼延十