/** * Returns the current value of synchronization state. * This operation has memory semantics of a {@code volatile} read. * @return current state value */ protectedfinalintgetState(){ return state; }
/** * Sets the value of synchronization state. * This operation has memory semantics of a {@code volatile} write. * @param newState the new state value */ protectedfinalvoidsetState(int newState){ state=newState; }
/** * Atomically sets synchronization state to the given updated * value if the current state value equals the expected value. * This operation has memory semantics of a {@code volatile} read * and write. * * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that the actual * value was not equal to the expected value. */ protectedfinalbooleancompareAndSetState(int expect,int update){ return STATE.compareAndSet(this,expect,update); }
分别提供了get/set方法及CAS的赋值方法.
head
等待队列队头.
tail
等待队列队尾.
head 和 tail 是java.util.concurrent.locks.AbstractQueuedSynchronizer.Node 的实例, 构成了一个双向链表.
Node
Node是为了表达一个等待线程而抽象的数据结构,主要有以下几个属性.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// Node节点所在的等待状态 volatileint waitStatus;
//前置节点 volatile Node prev;
// 后置节点 volatile Node next;
// 在这个节点上的线程 volatile Thread thread;
// 下一个等待的节点 Node nextWaiter;
他有两种模式,分别为共享模式及独占模式. 对应不同的操作.
其中waitStatus为枚举值,有以下几个值.
1 2 3 4 5 6 7 8 9 10 11
/** waitStatus value to indicate thread has cancelled. */ staticfinalint CANCELLED=1; /** waitStatus value to indicate successor's thread needs unparking. */ staticfinalint SIGNAL=-1; /** waitStatus value to indicate thread is waiting on condition. */ staticfinalint CONDITION=-2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate. */ staticfinalint PROPAGATE=-3;
/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ publicfinalvoidacquire(int arg){ if(!tryAcquire(arg)&& acquireQueued(addWaiter(Node.EXCLUSIVE),arg)) selfInterrupt(); }
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode){ Node node=newNode(mode);
/** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */ privatestaticbooleanshouldParkAfterFailedAcquire(Node pred,Node node){ int ws=pred.waitStatus; if(ws==Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ returntrue; if(ws>0){ /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do{ node.prev=pred=pred.prev; }while(pred.waitStatus>0); pred.next=node; }else{ /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ pred.compareAndSetWaitStatus(ws,Node.SIGNAL); } returnfalse; }
如果前置节点是SIGNAL.说明前置节点优先级更高,当前线程应该park.
如果前置节点被取消了,扔掉中间的取消节点. 不park.
如果前置节点是其他状态,设置为SIGNAL. 优先级最高. 不park.
不park的原因是再来一次. 检测一遍.
如果当前线程需要被park.则park且检查下是否中断了.
parkAndCheckInterrupt
1 2 3 4 5 6 7 8 9
/** * Convenience method to park and then check if interrupted. * * @return {@code true} if interrupted */ privatefinalbooleanparkAndCheckInterrupt(){ LockSupport.park(this); return Thread.interrupted(); }
// predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary, although with // a possibility that a cancelled node may transiently remain // reachable. Node predNext=pred.next;
// Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. node.waitStatus=Node.CANCELLED;
// If we are the tail, remove ourselves. if(node==tail&&compareAndSetTail(node,pred)){ pred.compareAndSetNext(predNext,null); }else{ // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if(pred!=head&& ((ws=pred.waitStatus)==Node.SIGNAL|| (ws<=0&&pred.compareAndSetWaitStatus(ws,Node.SIGNAL)))&& pred.thread!=null){ Node next=node.next; if(next!=null&&next.waitStatus<=0) pred.compareAndSetNext(predNext,next); }else{ unparkSuccessor(node); }
node.next=node; // help GC } }
当前node的thread设置为null.
扔掉当前节点之前的所有被取消了的节点.
取消掉当前节点。
设置尾节点为前一个节点.
release(int arg)
独占式的解锁.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ publicfinalbooleanrelease(int arg){ if(tryRelease(arg)){ Node h=head; if(h!=null&&h.waitStatus!=0) unparkSuccessor(h); returntrue; } returnfalse; }
/** * Wakes up node's successor, if one exists. * * @param node the node */ privatevoidunparkSuccessor(Node node){ /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws=node.waitStatus; if(ws< 0) node.compareAndSetWaitStatus(ws,0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s=node.next; if(s==null||s.waitStatus>0){ s=null; for(Node p=tail;p!=node&&p!=null;p=p.prev) if(p.waitStatus<=0) s=p; } if(s!=null) LockSupport.unpark(s.thread); }
/** * Acquires in shared mode, ignoring interrupts. Implemented by * first invoking at least once {@link #tryAcquireShared}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquireShared} until success. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquireShared} but is otherwise uninterpreted * and can represent anything you like. */ publicfinalvoidacquireShared(int arg){ if(tryAcquireShared(arg)< 0) doAcquireShared(arg); }
/** * Releases in shared mode. Implemented by unblocking one or more * threads if {@link #tryReleaseShared} returns true. * * @param arg the release argument. This value is conveyed to * {@link #tryReleaseShared} but is otherwise uninterpreted * and can represent anything you like. * @return the value returned from {@link #tryReleaseShared} */ publicfinalbooleanreleaseShared(int arg){ if(tryReleaseShared(arg)){ doReleaseShared(); returntrue; } returnfalse; }
/** * Release action for shared mode -- signals successor and ensures * propagation. (Note: For exclusive mode, release just amounts * to calling unparkSuccessor of head if it needs signal.) */ privatevoiddoReleaseShared(){ /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for(;;){ Node h=head; if(h!=null&&h!=tail){ int ws=h.waitStatus; if(ws==Node.SIGNAL){ if(!h.compareAndSetWaitStatus(Node.SIGNAL,0)) continue; // loop to recheck cases unparkSuccessor(h); } elseif(ws==0&& !h.compareAndSetWaitStatus(0,Node.PROPAGATE)) continue; // loop on failed CAS } if(h==head) // loop if head changed break; } }