// 抽象类,定义了传输的行为,他的实现类在下面 abstractstaticclassTransferer<E> { abstract E transfer(E e, boolean timed, long nanos); }
TransferStack 栈
内部保存了栈的头节点: head
1
volatile SNode head;
这个SNode也是内部类,比较简单.
1 2 3 4 5 6 7
staticfinalclassSNode { volatile SNode next; // next node in stack volatile SNode match; // the node matched to this volatile Thread waiter; // to control park/unpark Object item; // data; or null for REQUESTs int mode; }
@SuppressWarnings("unchecked") E transfer(E e, boolean timed, long nanos) { SNodes=null; // constructed/reused as needed // 当前请求的类型是生产者还是消费者 intmode= (e == null) ? REQUEST : DATA;
// 自旋 for (;;) { SNodeh= head; // 如果当前栈为空,或者栈首元素的类型和当前类型一些. if (h == null || h.mode == mode) { // empty or same-mode // 超时了 if (timed && nanos <= 0L) { // can't wait // 头结点已经超时了,弹出该头节点,让下一个节点成为头节点 if (h != null && h.isCancelled()) casHead(h, h.next); // pop cancelled node else // 超时了但是头结点为空,或者头结点还没取消,就返回空 returnnull; } elseif (casHead(h, s = snode(s, e, h, mode))) { // 更新头结点为当前节点 // 之后阻塞等待匹配操作 SNodem= awaitFulfill(s, timed, nanos); // 如果返回的m 是头结点,说明取消了,返回null if (m == s) { // wait was cancelled clean(s); returnnull; } // 如果头结点不为空, 且下一个节点是s. if ((h = head) != null && h.next == s) casHead(h, s.next); // help s's fulfiller // 返回匹配成功的item. return (E) ((mode == REQUEST) ? m.item : s.item); } } elseif (!isFulfilling(h.mode)) { // try to fulfill 没有正在进行中的匹配. // 查看头结点是否取消 if (h.isCancelled()) // already cancelled casHead(h, h.next); // pop and retry // 将当前节点置为头结点 elseif (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { // 等待匹配成功 for (;;) { // loop until matched or waiters disappear SNodem= s.next; // m is s's match if (m == null) { // all waiters are gone casHead(s, null); // pop fulfill node s = null; // use new node next time break; // restart main loop } SNodemn= m.next; if (m.tryMatch(s)) { casHead(s, mn); // pop both s and m return (E) ((mode == REQUEST) ? m.item : s.item); } else// lost match s.casNext(m, mn); // help unlink } } } else { // help a fulfiller // 正在匹配中 SNodem= h.next; // m is h's match if (m == null) // waiter is gone casHead(h, null); // pop fulfilling node else { SNodemn= m.next; if (m.tryMatch(h)) // help match casHead(h, mn); // pop both h and m else// lost match h.casNext(m, mn); // help unlink } } } }
代码比较复杂,尝试写一下各种分支:
栈为空,或者栈首元素和当前的类型一致,要么都是消费者要么都是生产者.
如果超时了:
栈首元素已经被取消,就更新栈首元素,重新自旋.
栈首元素没取消或者为空,直接返回null. 结束.
没有超时,将当前节点放到栈首成功. 等待匹配.
匹配失败,超时了,返回null。
匹配成功,返回对应的元素.
没有正在进行的匹配.
如果栈首元素取消了,弹出它,换成他的next继续循环.
将栈首元素更换为当前元素,且状态为正在匹配,成功.
自旋等待匹配,匹配成功进行返回,失败继续匹配.
更新失败,继续循环.
正在进行匹配,协助更新栈首及next指针.
TransferQueue 队列
首先是队列中的节点,保存了指向向一个节点的指针,当前节点的元素,以及等待的线程.
1 2 3 4 5 6 7
//队列中的节点 staticfinalclassQNode { volatile QNode next; // next node in queue volatile Object item; // CAS'ed to or from null volatile Thread waiter; // to control park/unpark finalboolean isData; }
@SuppressWarnings("unchecked") E transfer(E e, boolean timed, long nanos) { QNodes=null; // constructed/reused as needed booleanisData= (e != null);
for (;;) { QNodet= tail; QNodeh= head; if (t == null || h == null) // saw uninitialized value continue; // spin
if (h == t || t.isData == isData) { // empty or same-mode QNodetn= t.next; if (t != tail) // inconsistent read continue; if (tn != null) { // lagging tail advanceTail(t, tn); continue; } if (timed && nanos <= 0L) // can't wait returnnull; if (s == null) s = newQNode(e, isData); if (!t.casNext(null, s)) // failed to link in continue;
advanceTail(t, s); // swing tail and wait Objectx= awaitFulfill(s, e, timed, nanos); if (x == s) { // wait was cancelled clean(t, s); returnnull; }
if (!s.isOffList()) { // not already unlinked advanceHead(t, s); // unlink if head if (x != null) // and forget fields s.item = s; s.waiter = null; } return (x != null) ? (E)x : e;
} else { // complementary-mode QNodem= h.next; // node to fulfill if (t != tail || m == null || h != head) continue; // inconsistent read
Objectx= m.item; if (isData == (x != null) || // m already fulfilled x == m || // m cancelled !m.casItem(x, e)) { // lost CAS advanceHead(h, m); // dequeue and retry continue; }
publicvoidput(E e)throws InterruptedException { if (e == null) thrownewNullPointerException(); if (transferer.transfer(e, false, 0) == null) { Thread.interrupted(); thrownewInterruptedException(); } }
直接调用transferer的传输方法,成功则返回,否则就抛出异常.
其他类似.
出队方法
1 2 3 4 5 6 7 8
public E take()throws InterruptedException { Ee= transferer.transfer(null, false, 0); if (e != null) return e; Thread.interrupted(); thrownewInterruptedException(); }