staticfinalclassNode { finalboolean isData; // false if this is a request node volatile Object item; // initially non-null if isData; CASed to match volatile Node next; volatile Thread waiter; // null when not waiting for a match
保存了节点的属性(生产者/消费者), 节点的元素,下一个节点的指针,等待的线程.
LinkedTransferQueue 属性
1 2 3 4 5 6
transientvolatile Node head;
privatetransientvolatile Node tail;
privatetransientvolatileint sweepVotes;
保存了链表的头结点和尾节点,链表的常见结构.
构造方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
publicLinkedTransferQueue() { head = tail = newNode(); }
publicLinkedTransferQueue(Collection<? extends E> c) { Nodeh=null, t = null; for (E e : c) { NodenewNode=newNode(Objects.requireNonNull(e)); if (h == null) h = t = newNode; else t.appendRelaxed(t = newNode); } if (h == null) h = t = newNode(); head = h; tail = t; }
提供了两个构造方法,分别创建一个空的传输队列,和将给定集合的所有元素添加到队列中.
入队方法 生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
publicvoidput(E e) { xfer(e, true, ASYNC, 0); }
publicbooleanoffer(E e, long timeout, TimeUnit unit) { xfer(e, true, ASYNC, 0); returntrue; }
public E take()throws InterruptedException { Ee= xfer(null, false, SYNC, 0); if (e != null) return e; Thread.interrupted(); thrownewInterruptedException(); }
public E poll(long timeout, TimeUnit unit)throws InterruptedException { Ee= xfer(null, false, TIMED, unit.toNanos(timeout)); if (e != null || !Thread.interrupted()) return e; thrownewInterruptedException(); }
public E poll() { return xfer(null, false, NOW, 0); }
publicvoidtransfer(E e)throws InterruptedException { if (xfer(e, true, SYNC, 0) != null) { Thread.interrupted(); // failure possible only due to interrupt thrownewInterruptedException(); } }
publicbooleantryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null) returntrue; if (!Thread.interrupted()) returnfalse; thrownewInterruptedException(); }