(juc系列)并发集合之ConcurrentLinkedQueue源码

本文源码基于: JDK13

ConcurrentLinkedQueue

官方注释翻译

一个用链表实现的无界的线程安全的队列. 这个队列提供FIFO的元素顺序.

当多个线程需要共享一个集合的访问时, ConcurrentLinkedQueue是一个合适的选择. 向其他的并发集合实现一样,这个类不接受null元素.

这个实现使用了高效的无锁算法. 来源于paper Simple, Fast, and Practical Non-Blocking and Blocking
Concurrent Queue Algorithms

迭代器是弱同步的,返回元素是创建迭代器时的元素快照. 不会抛出ConcurrentModificationException. 可以和其他操作一起并发的进行. 迭代器创建时的元素,将会被精准的返回一次.

需要注意的是,和其他大多数集合不同,size方法不是常量时间的操作. 因为队列的异步特性,决定了计数当前的元素需要遍历所有元素,因此如果有别的线程正在更改,size方法可能返回不准确的数字.

批量操作不保证原子性,比如addAll等. 当foreachaddAll一起运行时,可能foreach只能观察到部分的元素.

这个类和他的迭代器实现了QueueIterator的所有可选方法.

源码

定义

1
2
3

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, java.io.Serializable {

这是一个队列.

链表节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

static final class Node<E> {
volatile E item;
volatile Node<E> next;

/**
* Constructs a node holding item. Uses relaxed write because
* item can only be seen after piggy-backing publication via CAS.
*/
Node(E item) {
ITEM.set(this, item);
}

/** Constructs a dead dummy node. */
Node() {}

void appendRelaxed(Node<E> next) {
// assert next != null;
// assert this.next == null;
NEXT.set(this, next);
}

boolean casItem(E cmp, E val) {
// assert item == cmp || item == null;
// assert cmp != null;
// assert val == null;
return ITEM.compareAndSet(this, cmp, val);
}
}

保存了当前节点的数据Item及指向下一个节点的指针next.

提供了两个cas方法,分别用来更改数据以及指针.

属性

1
2
3
4

transient volatile Node<E> head;

private transient volatile Node<E> tail;

保存了链表的头尾节点.

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

public ConcurrentLinkedQueue() {
head = tail = new Node<E>();
}

public ConcurrentLinkedQueue(Collection<? extends E> c) {
Node<E> h = null, t = null;
for (E e : c) {
Node<E> newNode = new Node<E>(Objects.requireNonNull(e));
if (h == null)
h = t = newNode;
else
t.appendRelaxed(t = newNode);
}
if (h == null)
h = t = new Node<E>();
head = h;
tail = t;
}

提供了两个构造方法,支持创建空的队列和将给定的集合全部初始化进队列.

入队方法 offer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

public boolean add(E e) {
return offer(e);
}

public boolean offer(E e) {
// 创建新的节点
final Node<E> newNode = new Node<E>(Objects.requireNonNull(e));

for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
// 尾节点的下一个为空. 直接cas更新,且成功了
if (q == null) {
// p is last node
if (NEXT.compareAndSet(p, null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time; failure is OK
TAIL.weakCompareAndSet(this, t, newNode);
return true;
}
// Lost CAS race to another thread; re-read next
}
// p节点被删除了,也就是出队了. 重新设置p节点的值
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
// 如果别的地方更新了尾节点,看一下应该继续向后找还是.
p = (p != t && t != (t = tail)) ? t : q;
}
}

offer方法进行实际的添加操作,将给定的节点,链接到已有队列的尾部. 过程中要充分考虑到与其他线程产生竞争的情况.

出队方法 poll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

public E poll() {
restartFromHead: for (;;) {
for (Node<E> h = head, p = h, q;; p = q) {
// 从队头开始
final E item;
// 如果队头就OK,直接cas更新,并且返回结果
if ((item = p.item) != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// 头结点的下一个为空,队列为空了
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
// 当前节点出队了,重新从队头开始
else if (p == q)
continue restartFromHead;
}
}
}

从队头开始遍历,如果成功拿到头结点,且CAS更新成功,就返回. 否则继续找到下一个.

查看队首 peek

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

public E peek() {
restartFromHead: for (;;) {
for (Node<E> h = head, p = h, q;; p = q) {
final E item;
// 队头元素OK. 返回队头元素
if ((item = p.item) != null
|| (q = p.next) == null) {
updateHead(h, p);
return item;
}
// 当前节点出队了,重新找队头
else if (p == q)
continue restartFromHead;
}
}
}

比较简单,不断尝试获取队头元素.

查看数量 size

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

public int size() {
restartFromHead: for (;;) {
int count = 0;
// 从队头开始计数
for (Node<E> p = first(); p != null;) {
if (p.item != null)
if (++count == Integer.MAX_VALUE)
break; // @see Collection.size()
// 当前元素出队了,从头开始计数
if (p == (p = p.next))
continue restartFromHead;
}

return count;
}
}

每次都从队头开始计数, 如果中间与双开被别人更改的情况,就重新从队头开始计数.

总结

一个非阻塞的,线程安全的队列,全程无锁,采用CAS+自旋实现.

参考文章


完。





联系我

最后,欢迎关注我的个人公众号【 呼延十 】,会不定期更新很多后端工程师的学习笔记。
也欢迎直接公众号私信或者邮箱联系我,一定知无不言,言无不尽。



以上皆为个人所思所得,如有错误欢迎评论区指正。

欢迎转载,烦请署名并保留原文链接。

联系邮箱:huyanshi2580@gmail.com

更多学习笔记见个人博客或关注微信公众号 <呼延十 >——>呼延十