(juc系列)并发集合之ConcurrentLinkedDeque源码

本文源码基于: JDK13

ConcurrentLinkedDeque

官方注释翻译

一个无界的,并发的双端队列,使用链表实现. 多线程间的并发写入,移除,访问操作,可以保证安全.当有很多线程共享一个公共集合时,ConcurrentLinkedDeque
是一个不错的选择. 像其他的并发集合一样,这个类不接受null元素.

迭代器是弱一致的.

需要注意的是,和其他大多数集合不同,size方法不是常量时间的操作. 因为队列的异步特性,决定了计数当前的元素需要遍历所有元素,因此如果有别的线程正在更改,size方法可能返回不准确的数字.

批量操作不保证原子性,比如addAll等. 当foreachaddAll一起运行时,可能foreach只能观察到部分的元素.

这个类和他的迭代器实现了QueueIterator的所有可选方法.

源码

定义

1
2
3
4

public class ConcurrentLinkedDeque<E>
extends AbstractCollection<E>
implements Deque<E>, java.io.Serializable {

一个双端队列.

内部链表节点

1
2
3
4
5
6

static final class Node<E> {
volatile Node<E> prev;
volatile E item;
volatile Node<E> next;
}

前后结点的指针,以及当前节点的元素.

属性

1
2
3
4

private transient volatile Node<E> head;

private transient volatile Node<E> tail;

保存了头尾节点

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

public ConcurrentLinkedDeque() {
head = tail = new Node<E>();
}

public ConcurrentLinkedDeque(Collection<? extends E> c) {
// Copy c into a private chain of Nodes
Node<E> h = null, t = null;
for (E e : c) {
Node<E> newNode = newNode(Objects.requireNonNull(e));
if (h == null)
h = t = newNode;
else {
NEXT.set(t, newNode);
PREV.set(newNode, t);
t = newNode;
}
}
initHeadTail(h, t);
}

两个构造方法,一个构造空的队列,一个将给定集合初始化到队列中.

入队方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

public void addFirst(E e) {
linkFirst(e);
}

public void addLast(E e) {
linkLast(e);
}

public boolean offerFirst(E e) {
linkFirst(e);
return true;
}

public boolean offerLast(E e) {
linkLast(e);
return true;
}

支持队头和队尾的添加操作,具体调用的是linkFirstlinkLast.

  • linkFirst
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

private void linkFirst(E e) {
// 创建当前节点
final Node<E> newNode = newNode(Objects.requireNonNull(e));

restartFromHead:
for (;;)
for (Node<E> h = head, p = h, q;;) {
// 如果节点的前置节点不为空,更新p节点
if ((q = p.prev) != null &&
(q = (p = q).prev) != null)
// Check for head updates every other hop.
// If p == q, we are sure to follow head instead.
p = (h != (h = head)) ? h : q;
// p节点出队了重新从头开始
else if (p.next == p) // PREV_TERMINATOR
continue restartFromHead;
else {
// p is first node
// 将当前节点设置为第一个.
NEXT.set(newNode, p); // CAS piggyback
// cas 更新相关属性, 原有头结点的前置属性,以及新的头结点等.
if (PREV.compareAndSet(p, null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this deque,
// and for newNode to become "live".
if (p != h) // hop two nodes at a time; failure is OK
HEAD.weakCompareAndSet(this, h, newNode);
return;
}
// Lost CAS race to another thread; re-read prev
}
}
}

将当前节点,设置为第一个节点,采用CAS+自旋实现,当发现已有头结点出队后,重新找头结点.

  • linkLast

链接为节点,和头结点思路一致.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

private void linkLast(E e) {
final Node<E> newNode = newNode(Objects.requireNonNull(e));

restartFromTail:
for (;;)
for (Node<E> t = tail, p = t, q;;) {
if ((q = p.next) != null &&
(q = (p = q).next) != null)
// Check for tail updates every other hop.
// If p == q, we are sure to follow tail instead.
p = (t != (t = tail)) ? t : q;
else if (p.prev == p) // NEXT_TERMINATOR
continue restartFromTail;
else {
// p is last node
PREV.set(newNode, p); // CAS piggyback
if (NEXT.compareAndSet(p, null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this deque,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time; failure is OK
TAIL.weakCompareAndSet(this, t, newNode);
return;
}
// Lost CAS race to another thread; re-read next
}
}
}

出队操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

public E pollFirst() {
restart: for (;;) {
for (Node<E> first = first(), p = first;;) {
// 队头节点, cas更改属性
final E item;
if ((item = p.item) != null) {
// recheck for linearizability
if (first.prev != null) continue restart;
if (ITEM.compareAndSet(p, item, null)) {
unlink(p);
return item;
}
}
// 已出队,重新开始
if (p == (p = p.next)) continue restart;
// p为空,队列为空,返回空
if (p == null) {
if (first.prev != null) continue restart;
return null;
}
}
}
}

public E pollLast() {
restart: for (;;) {
for (Node<E> last = last(), p = last;;) {
final E item;
if ((item = p.item) != null) {
// recheck for linearizability
if (last.next != null) continue restart;
if (ITEM.compareAndSet(p, item, null)) {
unlink(p);
return item;
}
}
if (p == (p = p.prev)) continue restart;
if (p == null) {
if (last.next != null) continue restart;
return null;
}
}
}
}

与入队对应的,将队首或者队尾进行弹出. 思路一致.

普通队列的操作

双端队列可以向普通队列一样,提供入队出队操作,此时他是一个FIFO的队列,也就是入队添加到队尾,出队从队头获取元素.

总结

ConcurrentLinkedQueue思路一致,使用CAS+自旋实现. 只是提供了双端队列相关的方法.

参考文章


完。





联系我

最后,欢迎关注我的个人公众号【 呼延十 】,会不定期更新很多后端工程师的学习笔记。
也欢迎直接公众号私信或者邮箱联系我,一定知无不言,言无不尽。



以上皆为个人所思所得,如有错误欢迎评论区指正。

欢迎转载,烦请署名并保留原文链接。

联系邮箱:huyanshi2580@gmail.com

更多学习笔记见个人博客或关注微信公众号 <呼延十 >——>呼延十