(juc系列)aqs源码学习笔记

Posted1 by 呼延十 on September 28, 2021 Hot:

前言

本文源码基于: JDK13

JUC是Java提供的一个并发工具包,提供了很多并发工具.

本文主要将AQS.

java.util.concurrent.locks.AbstractQueuedSynchronizer.

是一个基类,也可以理解为一个框架.

它提供了对于同步状态的控制,以前线程等待时的FIFO队列.

Fields

AQS的属性.

state

    /**
     * The synchronization state.
     */
    private volatile int state;

核心属性,同步状态. 使用volatile修饰.

与之对应的三个方法:

/**
 * Returns the current value of synchronization state.
 * This operation has memory semantics of a {@code volatile} read.
 * @return current state value
 */
protected final int getState(){
        return state;
        }

/**
 * Sets the value of synchronization state.
 * This operation has memory semantics of a {@code volatile} write.
 * @param newState the new state value
 */
protected final void setState(int newState){
        state=newState;
        }

/**
 * Atomically sets synchronization state to the given updated
 * value if the current state value equals the expected value.
 * This operation has memory semantics of a {@code volatile} read
 * and write.
 *
 * @param expect the expected value
 * @param update the new value
 * @return {@code true} if successful. False return indicates that the actual
 *         value was not equal to the expected value.
 */
protected final boolean compareAndSetState(int expect,int update){
        return STATE.compareAndSet(this,expect,update);
        }

分别提供了get/set方法及CAS的赋值方法.

等待队列队头.

tail

等待队列队尾.

head 和 tail 是java.util.concurrent.locks.AbstractQueuedSynchronizer.Node 的实例, 构成了一个双向链表.

Node

Node是为了表达一个等待线程而抽象的数据结构,主要有以下几个属性.


// Node节点所在的等待状态
volatile int waitStatus;

//前置节点
volatile Node prev;

// 后置节点
volatile Node next;

// 在这个节点上的线程
volatile Thread thread;

// 下一个等待的节点
        Node nextWaiter;

他有两种模式,分别为共享模式及独占模式. 对应不同的操作.

其中waitStatus为枚举值,有以下几个值.

/** waitStatus value to indicate thread has cancelled. */
static final int CANCELLED=1;
/** waitStatus value to indicate successor's thread needs unparking. */
static final int SIGNAL=-1;
/** waitStatus value to indicate thread is waiting on condition. */
static final int CONDITION=-2;
/**
 * waitStatus value to indicate the next acquireShared should
 * unconditionally propagate.
 */
static final int PROPAGATE=-3;

Public-Methods

AQS的方法可太多了. 先看一下对外提供的API方法.

众所周知,AQS是为了同步(加锁)而设计的. 那么一定是有获取锁,释放锁的方法的.先从这里切入.

acquire(int arg)

独占式应用,典型的就是ReentrantLock. ReentrantLock源码学习

独占模式的加锁代码.

    /**
 * Acquires in exclusive mode, ignoring interrupts.  Implemented
 * by invoking at least once {@link #tryAcquire},
 * returning on success.  Otherwise the thread is queued, possibly
 * repeatedly blocking and unblocking, invoking {@link
 * #tryAcquire} until success.  This method can be used
 * to implement method {@link Lock#lock}.
 *
 * @param arg the acquire argument.  This value is conveyed to
 *        {@link #tryAcquire} but is otherwise uninterpreted and
 *        can represent anything you like.
 */
public final void acquire(int arg){
        if(!tryAcquire(arg)&&
        acquireQueued(addWaiter(Node.EXCLUSIVE),arg))
        selfInterrupt();
        }

独占模式的获取锁, 并且忽略中断. 至少调用一次tryAcquire.如果成功了就返回.

否则的话将线程加入等待队列,重复的进行tryAcquire. 直到成功为止.

traAcquire(int arg)

这个方法在AQS中是抽象的, protected修饰. 由子类具体进行实现.

它定义的:

独占模式的获取锁, 如果可以获取到,返回成功,如果获取失败,线程应该被放入等待队列.

如果线程已经在等待队列中, 应该是被其他线程唤醒了.

总之: 这个方法是非阻塞的,立即返回的,要么成功加锁,返回true. 要么加锁失败,返回flase. ,之后的操作就不归这个方法管了.

addWaiter(Node node)

private方法,给当前线程创建一个Node并且放入等待队列.

    /**
 * Creates and enqueues node for current thread and given mode.
 *
 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
 * @return the new node
 */
private Node addWaiter(Node mode){
        Node node=new Node(mode);

        for(;;){
        Node oldTail=tail;
        if(oldTail!=null){
        node.setPrevRelaxed(oldTail);
        if(compareAndSetTail(oldTail,node)){
        oldTail.next=node;
        return node;
        }
        }else{
        initializeSyncQueue();
        }
        }
        }
  1. 创建一个Node.
  2. 如果队尾为空,说明等待队列没有初始化,进行初始化.
  3. 将当前节点设置为新的队尾.

acquireQueued(Node node, int arg)

一个final方法,子类无法重写.

将等待队列中的所有线程,进行获取锁的行为.

    /**
 * Acquires in exclusive uninterruptible mode for thread already in
 * queue. Used by condition wait methods as well as acquire.
 *
 * @param node the node
 * @param arg the acquire argument
 * @return {@code true} if interrupted while waiting
 */
final boolean acquireQueued(final Node node,int arg){
        boolean interrupted=false;
        try{
        for(;;){
final Node p=node.predecessor();
        if(p==head&&tryAcquire(arg)){
        setHead(node);
        p.next=null; // help GC
        return interrupted;
        }
        if(shouldParkAfterFailedAcquire(p,node))
        interrupted|=parkAndCheckInterrupt();
        }
        }catch(Throwable t){
        cancelAcquire(node);
        if(interrupted)
        selfInterrupt();
        throw t;
        }
        }

如果当前节点的前置节点是头结点,说明当前节点是优先级最高的那个.尝试获取锁.

如果当前节点不是优先级最高的,或者获取锁失败了. 调用shouldParkAfterFailedAcquire.

shouldParkAfterFailedAcquire

/**
 * Checks and updates status for a node that failed to acquire.
 * Returns true if thread should block. This is the main signal
 * control in all acquire loops.  Requires that pred == node.prev.
 *
 * @param pred node's predecessor holding status
 * @param node the node
 * @return {@code true} if thread should block
 */
private static boolean shouldParkAfterFailedAcquire(Node pred,Node node){
        int ws=pred.waitStatus;
        if(ws==Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
        if(ws>0){
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do{
        node.prev=pred=pred.prev;
        }while(pred.waitStatus>0);
        pred.next=node;
        }else{
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        pred.compareAndSetWaitStatus(ws,Node.SIGNAL);
        }
        return false;
        }

如果前置节点是SIGNAL.说明前置节点优先级更高,当前线程应该park.

如果前置节点被取消了,扔掉中间的取消节点. 不park.

如果前置节点是其他状态,设置为SIGNAL. 优先级最高. 不park.

不park的原因是再来一次. 检测一遍.

如果当前线程需要被park.则park且检查下是否中断了.

parkAndCheckInterrupt
/**
 * Convenience method to park and then check if interrupted.
 *
 * @return {@code true} if interrupted
 */
private final boolean parkAndCheckInterrupt(){
        LockSupport.park(this);
        return Thread.interrupted();
        }

如果发生异常,则取消掉这次获取锁.

cancelAcquire(Node node)
    /**
 * Cancels an ongoing attempt to acquire.
 *
 * @param node the node
 */
private void cancelAcquire(Node node){
        // Ignore if node doesn't exist
        if(node==null)
        return;

        node.thread=null;

        // Skip cancelled predecessors
        Node pred=node.prev;
        while(pred.waitStatus>0)
        node.prev=pred=pred.prev;

        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary, although with
        // a possibility that a cancelled node may transiently remain
        // reachable.
        Node predNext=pred.next;

        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        node.waitStatus=Node.CANCELLED;

        // If we are the tail, remove ourselves.
        if(node==tail&&compareAndSetTail(node,pred)){
        pred.compareAndSetNext(predNext,null);
        }else{
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if(pred!=head&&
        ((ws=pred.waitStatus)==Node.SIGNAL||
        (ws<=0&&pred.compareAndSetWaitStatus(ws,Node.SIGNAL)))&&
        pred.thread!=null){
        Node next=node.next;
        if(next!=null&&next.waitStatus<=0)
        pred.compareAndSetNext(predNext,next);
        }else{
        unparkSuccessor(node);
        }

        node.next=node; // help GC
        }
        }
  1. 当前node的thread设置为null.
  2. 扔掉当前节点之前的所有被取消了的节点.
  3. 取消掉当前节点。
  4. 设置尾节点为前一个节点.

release(int arg)

独占式的解锁.

    /**
 * Releases in exclusive mode.  Implemented by unblocking one or
 * more threads if {@link #tryRelease} returns true.
 * This method can be used to implement method {@link Lock#unlock}.
 *
 * @param arg the release argument.  This value is conveyed to
 *        {@link #tryRelease} but is otherwise uninterpreted and
 *        can represent anything you like.
 * @return the value returned from {@link #tryRelease}
 */
public final boolean release(int arg){
        if(tryRelease(arg)){
        Node h=head;
        if(h!=null&&h.waitStatus!=0)
        unparkSuccessor(h);
        return true;
        }
        return false;
        }

调用tryRelease(int arg). 如果解锁成功,唤醒头结点的后继节点. 如果解锁失败, 返回false.

tryRelease(int arg)

解锁操作,由子类负责具体实现,可以后期针对ReentrantLock学习.

这个方法,非阻塞式, 即时返回true/false. 代表是否释放成功.

unparkSuccessor(Node node)

    /**
 * Wakes up node's successor, if one exists.
 *
 * @param node the node
 */
private void unparkSuccessor(Node node){
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws=node.waitStatus;
        if(ws< 0)
        node.compareAndSetWaitStatus(ws,0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s=node.next;
        if(s==null||s.waitStatus>0){
        s=null;
        for(Node p=tail;p!=node&&p!=null;p=p.prev)
        if(p.waitStatus<=0)
        s=p;
        }
        if(s!=null)
        LockSupport.unpark(s.thread);
        }

在等待队列中,从后向前找到正序的第一个需要唤醒的Node. 执行unpark操作.

acquireShared(int arg)

共享锁的相关实现,可以查看CountDownLatch的相关代码. CountDownLatch源码解析

共享模式的获取锁.忽略中断.

至少调用一次TryAcquireShared, 如果成功就返回,失败就将线程加入等待队列. 重复调用TryAcquireShared知道成功.

    /**
 * Acquires in shared mode, ignoring interrupts.  Implemented by
 * first invoking at least once {@link #tryAcquireShared},
 * returning on success.  Otherwise the thread is queued, possibly
 * repeatedly blocking and unblocking, invoking {@link
 * #tryAcquireShared} until success.
 *
 * @param arg the acquire argument.  This value is conveyed to
 *        {@link #tryAcquireShared} but is otherwise uninterpreted
 *        and can represent anything you like.
 */
public final void acquireShared(int arg){
        if(tryAcquireShared(arg)< 0)
        doAcquireShared(arg);
        }

tryAcquireShared(int arg)

抽象方法,由子类负责实现.

如果获取锁成功,直接返回. 如果获取失败,线程加入等待队列,如果线程已经加入,等待被其他人释放锁的动作唤醒.

doAcquireShared(int arg)

    /**
 * Acquires in shared uninterruptible mode.
 * @param arg the acquire argument
 */
private void doAcquireShared(int arg){
final Node node=addWaiter(Node.SHARED);
        boolean interrupted=false;
        try{
        for(;;){
final Node p=node.predecessor();
        if(p==head){
        int r=tryAcquireShared(arg);
        if(r>=0){
        setHeadAndPropagate(node,r);
        p.next=null; // help GC
        return;
        }
        }
        if(shouldParkAfterFailedAcquire(p,node))
        interrupted|=parkAndCheckInterrupt();
        }
        }catch(Throwable t){
        cancelAcquire(node);
        throw t;
        }finally{
        if(interrupted)
        selfInterrupt();
        }
        }
  1. 首先添加一个SHARED模式的节点到等待队列.
  2. 如果当前节点的前置节点是head. 说明当前节点的优先级最高,尝试获取锁. 如果成功,则返回.
  3. 如果当前节点不是优先级最高的,或者获取锁失败了,调用shouldParkAfterFailedAcquire判断是否需要进行park. 如果需要,则park当前线程并检查中断.
  4. 如果不需要park.则自旋. 进行下一次判断,是否需要获取锁.
  5. 如果catch异常,则取消这次获取锁,流程同上面独占模式取消.

releaseShared(int arg)

共享模式的释放锁.

    /**
 * Releases in shared mode.  Implemented by unblocking one or more
 * threads if {@link #tryReleaseShared} returns true.
 *
 * @param arg the release argument.  This value is conveyed to
 *        {@link #tryReleaseShared} but is otherwise uninterpreted
 *        and can represent anything you like.
 * @return the value returned from {@link #tryReleaseShared}
 */
public final boolean releaseShared(int arg){
        if(tryReleaseShared(arg)){
        doReleaseShared();
        return true;
        }
        return false;
        }

非阻塞式的释放锁.调用tryReleaseShared.

如果释放成功,调用doReleaseShared.如果失败,返回false.

tryReleaseShared(int arg)

抽象方法,具体由子类进行实现.

非阻塞式的,返回释放的结果.

doReleaseShared()

    /**
 * Release action for shared mode -- signals successor and ensures
 * propagation. (Note: For exclusive mode, release just amounts
 * to calling unparkSuccessor of head if it needs signal.)
 */
private void doReleaseShared(){
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for(;;){
        Node h=head;
        if(h!=null&&h!=tail){
        int ws=h.waitStatus;
        if(ws==Node.SIGNAL){
        if(!h.compareAndSetWaitStatus(Node.SIGNAL,0))
        continue;            // loop to recheck cases
        unparkSuccessor(h);
        }
        else if(ws==0&&
        !h.compareAndSetWaitStatus(0,Node.PROPAGATE))
        continue;                // loop on failed CAS
        }
        if(h==head)                   // loop if head changed
        break;
        }
        }

共享模式的释放锁操作. 通知后继者并且确保传播.

独占式的解锁,只需要唤醒下一个即可。而共享式的解锁,需要广播解锁消息.

遍历等待队列,将SIGNAL的节点继任者全部唤醒.

完.

参考文章


完。


联系我

最后,欢迎关注我的个人公众号【 呼延十 】,会不定期更新很多后端工程师的学习笔记。 也欢迎直接公众号私信或者邮箱联系我,一定知无不言,言无不尽。



以上皆为个人所思所得,如有错误欢迎评论区指正。

欢迎转载,烦请署名并保留原文链接。

联系邮箱:huyanshi2580@gmail.com

更多学习笔记见个人博客或关注微信公众号 <呼延十 >——>呼延十