/** * Creates a new phaser with no initially registered parties, no * parent, and initial phase number 0. Any thread using this * phaser will need to first register for it. */ publicPhaser() { this(null, 0); }
/** * Creates a new phaser with the given number of registered * unarrived parties, no parent, and initial phase number 0. * * @param parties the number of parties required to advance to the * next phase * @throws IllegalArgumentException if parties less than zero * or greater than the maximum number of parties supported */ publicPhaser(int parties) { this(null, parties); }
/** * Creates a new phaser with the given parent and number of * registered unarrived parties. When the given parent is non-null * and the given number of parties is greater than zero, this * child phaser is registered with its parent. * * @param parent the parent phaser * @param parties the number of parties required to advance to the * next phase * @throws IllegalArgumentException if parties less than zero * or greater than the maximum number of parties supported */ publicPhaser(Phaser parent, int parties) { // 高16位有值,异常,初始化时不可以已有参与者 if (parties >>> PARTIES_SHIFT != 0) thrownewIllegalArgumentException("Illegal number of parties"); // 当前阶段置为0.初始值 intphase=0; this.parent = parent; // 给定的父节点不为空, 是一个树形的Phaser. if (parent != null) { // 共享同一个root节点,还有所有节点共享队列 finalPhaserroot= parent.root; this.root = root; this.evenQ = root.evenQ; this.oddQ = root.oddQ; // 如果参与者不为0,当前节点是一个有效的节点,向当前节点的父节点,注册一个参与者,代表(当前节点需要父节点等待) if (parties != 0) phase = parent.doRegister(1); } else { // 父节点为空,当前是孤立的,非树形的Phaser. // 赋值一些属性 this.root = this; this.evenQ = newAtomicReference<QNode>(); this.oddQ = newAtomicReference<QNode>(); } // 初始化状态,如果没有参与者,赋值为EMPTY=1 // 如果有, state=高32位记录阶段号,16-32位记录参与者数量,低16记录没有到达的数量. 初始化的时候,参与者数量和没有到达的数量是一致的 this.state = (parties == 0) ? (long)EMPTY : ((long)phase << PHASE_SHIFT) | ((long)parties << PARTIES_SHIFT) | ((long)parties); }
/** * The root of phaser tree. Equals this if not in a tree. */ // 当前Phaser的根节点. privatefinal Phaser root;
/** * Heads of Treiber stacks for waiting threads. To eliminate * contention when releasing some threads while adding others, we * use two of them, alternating across even and odd phases. * Subphasers share queues with root to speed up releases. */ // 等待线程栈的头结点. // 根据奇偶数使用不同的Phaser. // 子节点共享相同的两个栈. privatefinal AtomicReference<QNode> evenQ; privatefinal AtomicReference<QNode> oddQ;
/** * Arrives at this phaser, without waiting for others to arrive. * * <p>It is a usage error for an unregistered party to invoke this * method. However, this error may result in an {@code * IllegalStateException} only upon some subsequent operation on * this phaser, if ever. * * @return the arrival phase number, or a negative value if terminated * @throws IllegalStateException if not terminated and the number * of unarrived parties would become negative */ publicintarrive() { return doArrive(ONE_ARRIVAL); }
/** * Arrives at this phaser and deregisters from it without waiting * for others to arrive. Deregistration reduces the number of * parties required to advance in future phases. If this phaser * has a parent, and deregistration causes this phaser to have * zero parties, this phaser is also deregistered from its parent. * * <p>It is a usage error for an unregistered party to invoke this * method. However, this error may result in an {@code * IllegalStateException} only upon some subsequent operation on * this phaser, if ever. * * @return the arrival phase number, or a negative value if terminated * @throws IllegalStateException if not terminated and the number * of registered or unarrived parties would become negative */ publicintarriveAndDeregister() { return doArrive(ONE_DEREGISTER); }
/** * Arrives at this phaser and awaits others. Equivalent in effect * to {@code awaitAdvance(arrive())}. If you need to await with * interruption or timeout, you can arrange this with an analogous * construction using one of the other forms of the {@code * awaitAdvance} method. If instead you need to deregister upon * arrival, use {@code awaitAdvance(arriveAndDeregister())}. * * <p>It is a usage error for an unregistered party to invoke this * method. However, this error may result in an {@code * IllegalStateException} only upon some subsequent operation on * this phaser, if ever. * * @return the arrival phase number, or the (negative) * {@linkplain #getPhase() current phase} if terminated * @throws IllegalStateException if not terminated and the number * of unarrived parties would become negative */ // 到达并等待其他参与者,等价于调用`awaitAdvance(arrive())`. publicintarriveAndAwaitAdvance() { // Specialization of doArrive+awaitAdvance eliminating some reads/paths finalPhaserroot=this.root; for (;;) { // 这块和之前逻辑一样 longs= (root == this) ? state : reconcileState(); intphase= (int)(s >>> PHASE_SHIFT); if (phase < 0) return phase; intcounts= (int)s; intunarrived= (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK); if (unarrived <= 0) thrownewIllegalStateException(badArrive(s)); // 到达一个更新State成功 if (STATE.compareAndSet(this, s, s -= ONE_ARRIVAL)) { // 如果当前不是最后一个到达的参与者,阻塞等待 if (unarrived > 1) return root.internalAwaitAdvance(phase, null); // 如果当前节点是最后一个到达的参与者,向父节点进行“到达且等待”操作 if (root != this) return parent.arriveAndAwaitAdvance(); // 这里是,当前节点是最后一个到达的参与者,且当前节点不是树形结构 // 计算新的State并设置State longn= s & PARTIES_MASK; // base of next state intnextUnarrived= (int)n >>> PARTIES_SHIFT; if (onAdvance(phase, nextUnarrived)) n |= TERMINATION_BIT; elseif (nextUnarrived == 0) n |= EMPTY; else n |= nextUnarrived; intnextPhase= (phase + 1) & MAX_PHASE; n |= (long)nextPhase << PHASE_SHIFT; if (!STATE.compareAndSet(this, s, n)) return (int)(state >>> PHASE_SHIFT); // terminated // 唤醒等待者 releaseWaiters(phase); return nextPhase; } } }
/** * Awaits the phase of this phaser to advance from the given phase * value, returning immediately if the current phase is not equal * to the given phase value or this phaser is terminated. * * @param phase an arrival phase number, or negative value if * terminated; this argument is normally the value returned by a * previous call to {@code arrive} or {@code arriveAndDeregister}. * @return the next arrival phase number, or the argument if it is * negative, or the (negative) {@linkplain #getPhase() current phase} * if terminated */ // 等待阶段升级 // 如果给定的阶段和当前不一致,或者当前Phaser终止了,直接返回. publicintawaitAdvance(int phase) { finalPhaserroot=this.root; // 拿到Staate. longs= (root == this) ? state : reconcileState(); // 当前的阶段 intp= (int)(s >>> PHASE_SHIFT); if (phase < 0) return phase; // 如果一样,等待父节点升级, if (p == phase) return root.internalAwaitAdvance(phase, null); return p; }
/** * Possibly blocks and waits for phase to advance unless aborted. * Call only on root phaser. * * @param phase current phase * @param node if non-null, the wait node to track interrupt and timeout; * if null, denotes noninterruptible wait * @return current phase */ // 只有根节点调用,可能会阻塞 privateintinternalAwaitAdvance(int phase, QNode node) { // assert root == this; // 奇偶队列交替使用,确保旧的是空的 releaseWaiters(phase-1); // ensure old queue clean booleanqueued=false; // true when node is enqueued intlastUnarrived=0; // to increase spins upon change intspins= SPINS_PER_ARRIVAL; long s; int p; // 当给定的阶段和当前阶段一致时 while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) { // 节点为空, if (node == null) { // spinning in noninterruptible mode // 计算自旋次数 intunarrived= (int)s & UNARRIVED_MASK; if (unarrived != lastUnarrived && (lastUnarrived = unarrived) < NCPU) spins += SPINS_PER_ARRIVAL; booleaninterrupted= Thread.interrupted(); // 自旋为0或者线程中断了,创建一个节点 if (interrupted || --spins < 0) { // need node to record intr node = newQNode(this, phase, false, false, 0L); node.wasInterrupted = interrupted; } else // 休眠等待 Thread.onSpinWait(); } elseif (node.isReleasable()) // done or aborted break; elseif (!queued) { // push onto queue // 入队 AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ; QNodeq= node.next = head.get(); if ((q == null || q.phase == phase) && (int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq queued = head.compareAndSet(q, node); } else { try { ForkJoinPool.managedBlock(node); } catch (InterruptedException cantHappen) { node.wasInterrupted = true; } } }
if (node != null) { if (node.thread != null) node.thread = null; // avoid need for unpark() if (node.wasInterrupted && !node.interruptible) Thread.currentThread().interrupt(); if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase) return abortWait(phase); // possibly clean up on abort } releaseWaiters(phase); return p; }
/** * Forces this phaser to enter termination state. Counts of * registered parties are unaffected. If this phaser is a member * of a tiered set of phasers, then all of the phasers in the set * are terminated. If this phaser is already terminated, this * method has no effect. This method may be useful for * coordinating recovery after one or more tasks encounter * unexpected exceptions. */ publicvoidforceTermination() { // Only need to change root state finalPhaserroot=this.root; long s; while ((s = root.state) >= 0) { if (STATE.compareAndSet(root, s, s | TERMINATION_BIT)) { // signal all threads releaseWaiters(0); // Waiters on evenQ releaseWaiters(1); // Waiters on oddQ return; } } }