staticfinalclassNode<E> { volatile E item; volatile Node<E> next;
/** * Constructs a node holding item. Uses relaxed write because * item can only be seen after piggy-backing publication via CAS. */ Node(E item) { ITEM.set(this, item); }
publicConcurrentLinkedQueue() { head = tail = newNode<E>(); }
publicConcurrentLinkedQueue(Collection<? extends E> c) { Node<E> h = null, t = null; for (E e : c) { Node<E> newNode = newNode<E>(Objects.requireNonNull(e)); if (h == null) h = t = newNode; else t.appendRelaxed(t = newNode); } if (h == null) h = t = newNode<E>(); head = h; tail = t; }
publicbooleanoffer(E e) { // 创建新的节点 final Node<E> newNode = newNode<E>(Objects.requireNonNull(e));
for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; // 尾节点的下一个为空. 直接cas更新,且成功了 if (q == null) { // p is last node if (NEXT.compareAndSet(p, null, newNode)) { // Successful CAS is the linearization point // for e to become an element of this queue, // and for newNode to become "live". if (p != t) // hop two nodes at a time; failure is OK TAIL.weakCompareAndSet(this, t, newNode); returntrue; } // Lost CAS race to another thread; re-read next } // p节点被删除了,也就是出队了. 重新设置p节点的值 elseif (p == q) // We have fallen off list. If tail is unchanged, it // will also be off-list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. p = (t != (t = tail)) ? t : head; else // Check for tail updates after two hops. // 如果别的地方更新了尾节点,看一下应该继续向后找还是. p = (p != t && t != (t = tail)) ? t : q; } }
public E poll() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;; p = q) { // 从队头开始 final E item; // 如果队头就OK,直接cas更新,并且返回结果 if ((item = p.item) != null && p.casItem(item, null)) { // Successful CAS is the linearization point // for item to be removed from this queue. if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } // 头结点的下一个为空,队列为空了 elseif ((q = p.next) == null) { updateHead(h, p); returnnull; } // 当前节点出队了,重新从队头开始 elseif (p == q) continue restartFromHead; } } }
从队头开始遍历,如果成功拿到头结点,且CAS更新成功,就返回. 否则继续找到下一个.
查看队首 peek
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
public E peek() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;; p = q) { final E item; // 队头元素OK. 返回队头元素 if ((item = p.item) != null || (q = p.next) == null) { updateHead(h, p); return item; } // 当前节点出队了,重新找队头 elseif (p == q) continue restartFromHead; } } }
比较简单,不断尝试获取队头元素.
查看数量 size
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
publicintsize() { restartFromHead: for (;;) { intcount=0; // 从队头开始计数 for (Node<E> p = first(); p != null;) { if (p.item != null) if (++count == Integer.MAX_VALUE) break; // @see Collection.size() // 当前元素出队了,从头开始计数 if (p == (p = p.next)) continue restartFromHead; } return count; } }