开发者

Java AQS(AbstractQueuedSynchronizer)源码解析

开发者 https://www.devze.com 2023-02-10 10:41 出处:网络 作者: 花醉霜寒
目录AQS概要排他获取锁支持中断的获取锁支持超时时间的获取锁功能共享锁获取锁的释放取消获取锁总结AbstractQueuedSynchronizer被称为队列同步器,简称为大家熟知的AQS,这个类可以称作concurrent包的基础,该类提供
目录
  • AQS概要
  • 排他获取锁
  • 支持中断的获取锁
  • 支持超时时间的获取锁功能
  • 共享锁获取
  • 锁的释放
  • 取消获取锁
  • 总结

AbstractQueuedSynchronizer被称为队列同步器,简称为大家熟知的AQS,这个类可以称作concurrent包的基础,该类提供了同步的基本功能。该类包括如下几个核心要素:

  • AQS内部维护一个volatile修饰的state变量,state用于标记锁的状态;
  • AQS通过内部类Node记录当前是哪个线程持有锁;
  • AQS通过LockSupport的park和unPark方法来阻塞和唤醒线程;
  • AQS通过node来维护一个队列,用于保存所有阻塞的线程。

下面通过剖析源码来看看AQS是如何工作的。

AQS概要

AQS通过内部类Node记录当前是哪个线程持有锁,Node中有一个前驱节点和一个后继节点,形成一个双向链表,这个链表是一种CLH队列,其中waitStatus表示当前线程的状态,其可能的取值包括以下几种:

  • SIGNAL(-1),表示后继线程已经或者即将被阻塞,当前线程释放锁或者获取锁失败后需要唤醒后继线程;
  • CANCELLED(1),表示当前线程因为超时或者中断被取消,这个状态不可以被修改;
  • CONDITION(-2),当前线程为条件等待,其状态设置0之后才能去竞争锁;
  • PROPAGATE(-3),表示共享锁释放之后需要传递给后继节点,只有头结点的才会有该状态;
  • 0,该状态为初始值,不属于上面任意一种状态。

Node对象中还有一个nextWaiter变量,指向下一个条件等待节点,相当于在CLH队列的基础上维护了一个简单的单链表来关联条件等待的节点。

	static final class Node {

        static final Node SHARED = new Node();

        static final Node EXCLUSIVE = null;

        static final int CANCELLED =  1;

        static final int SIGNAL    = -1;

        static final int CONDITION = -2;
  
        static final int PROPAGATE = -3;

        volatile int waitStatus;

        volatile Node prev;

        volatile Node next;

        volatile Thread thread;

        Node nextWaiter;

        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
        ...
        构造方法
        ...
    }

Node提供了两种入队列的方法,即enq和addwaiter,enq方法如下所示,当尾节点tail为null时,表明阻塞队列还没有被初始化,通过CAS操作来设置头结点,头结点为new Node(),实际上头结点中没有阻塞的线程,算得上是一个空的节点(注意空节点和null是不一样的),然后进行tail=head操作,这也说明当head=tail的时候,队列中实际上是不存在阻塞线程的,然后将需要入队列的node放入队列尾部,将tail指向node。

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
        	//如果tail为空,说明CLH队列没有被初始化,
            if (t == null) {
            	//初始化CLH队列,将head和tail指向一个new Node(),
            	//此时虽然CLH有一个节点,但是并没有真正意义的阻塞线程
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
            	//将node放入队列尾部,并通过cas将tail指向node
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

addWaiter通常表示添加一个条件等待的节点入队列,该方法首先尝试通过CAS操作快速入队列,如果失败则通过调用enq来入队列。

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        //尝试快速入队列
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //快速入队列失败则采用enq方入队列
        enq(node);
        return node;
    }

Node还提供了唤醒后继节点线程的功能,主要是通过LockSupport来实现的,源码如下所示,

    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Nodjavascripte t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

排他获取锁

不支持中断的获取锁\color{green}{不支持中断的获取锁}不支持中断的获取锁

	//不可中断的获取锁
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            //对中断做补偿,中断当前线程
            selfInterrupt();
    }

acquire方法首先会调用tryAcquire方法尝试获取锁,如果获取锁失败,首先通过addWaiter将当前线程放入CLH队列中,然后通过acquireQueued方法获取锁,acquireQueued方法源码如下所示:

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
        	//记录中断状态
            boolean interrupted = false;
            //自旋式的获取锁
            for (;;) {
                final Node p = node.predecessor();
                //当前线程为CLH中的第一个阻塞线程才会尝试去获取锁
                if (p == head && tryAcquire(arg)) {
                	//获取成功则更新head
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    //返回中断状态
                    return interrupted;
                }
                //判断中断信息
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
        	//如果获取锁失败,则取消获取锁的操作
            if (failed)
                cancelAcquire(node);
        }
    }

acquireQueued方法是无中断的获取锁,该方法有一个布尔类型的返回值,该值不是表示是否成功获取锁,而是标示当前线程的中断状态,因为acquireQueued方法是无法响应中断的,需要对中断进行补偿,这个补偿体现在acquire方法中。

    //模板方法tryAcquire需要子类进行具体实现
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

支持中断的获取锁

    //可中断的获取锁
    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

acquireInterruptibly方法获取锁的过程中能够响应中断,主要体现在获取锁之前会判断一下当前线程的中断中断状态,若中断则抛出InterruptedException,然后通过tryAcquire获取锁,获取成功直接返回,获取失败则通过doAcquireInterruptibly获取锁,该方法和acquireQueued最大的区别就是在判断parkAndCheckInterrupt后,acquireQueued仅仅记录中断状态,parkAndCheckInterrupt则会抛出异常。

    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    //抛出异常,响应中断
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

支持超时时间的获取锁功能

    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        //响应中断
        if (Thread.interrupted())
            throw new InterruptedException();
        //首先通过tryAcquire快速获取锁,若失败则调用doAcquireNanos方法
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }

从方法tryAcquireNanos的源码可以看出,该方法也是响应中断的,该方法首先调用模板方法tryAcquire快速的获取锁,如果失败则通过doAcquireNanos获取锁,doAcquireNanos中支持超时机制,其源码如下所示:

    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                //判断如果超时则直接返回false,代表获取锁失败
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAf开发者_开发教程terFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

doAcquireNanos方法与acquireQueued方法的区别是每次循环获取锁过程中都会计算deadline和当前时间的差值,如果这个差值小于0,则表示获取锁的操作已经超时,则直接返回false表示获取锁失败。

共享锁获取

AQS中不仅支持排他锁的获取,即acquire、acquireInterruptibly和tryAcquireNanos,还提供了共享锁的获取操作方法,包括acquireShared、acquireSharedInterruptibly和tryAcquireSharedNanos,这三个方法源码如下所示:

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

    public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquireShared(arg) >= 0 ||
            doAcquireSharedNanos(arg, nanosTimeout);
    }

共享锁的获取和排他锁的获取方法类似,共享锁调用了不同的模板方法tryAcquireShared,这里介绍一下doAcquireShared方法,其他方法变化的套路和共享锁的使用套路一样,doAcquireShared方法源码如下所示:

    private void doAcquireShared(int arg) {
    	//当前线程入队列
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            //自旋式的获取锁
            for (;;) {
                final Node p = node.predecessor();
                //只有队列中的第一个阻塞线程才能获取锁	
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        //获取锁成功,补偿中断
                        if (interrupted)
    编程客栈                        selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                //通过interrupted记录中断信息
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

doAcquireShared方法没有返回值,与acquireQueued不同的是:

  • doAcquireShared没有返回值,该方法的中断补偿是在方法内完成的,获取锁成功之后,会判断中断信息interrupted的状态,如果为true则调用selfInterrupt()方法中断当前线程;
  • 获取锁成功之后不是简单的设置head,而是通过setHeadAndPropagate方法来设置头结点和并且判断后继节点的信息,对后编程客栈继节点中的线程进行唤醒操作等,setHeadAndPropagate方法源码如下所示:
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; 
        //设置新的头结点
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            //如果后继节点为空或者为SHARED类型的节点,执行doReleaseShared方法
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                	//状态为SIGNAL,则唤醒后继节点中的线程
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            
                    unparkSuccessor(h);
                }
            python    //若状态为0,则设置状态为PROPAGATE
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                
            }
            if (h == head)                  
                break;
        }
    }

锁的释放

锁的释放也分为释放排他锁和释放共享锁,分别为release方法和releaseShared方法,源码如下所示,

	//释放排他锁
    public final boolean release(int arg) {
    	//释放锁,然后唤醒后继节点的线程
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }


    //释放共享锁
    public final boolean releaseShared(int arg) {
    	//释放锁,然后调用doReleaseShared方法
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

release方法和releaseShared方法分别调用模板方法tryRelease和tryReleaseShared来释放锁,release方法中直接通过调用unparkSuccessor唤醒后继线程,而releaseShared的唤醒操作在doReleaseShared方法中进行。

取消获取锁

当获取锁失败时,需要进行一些状态清理和变化,cancelAcquire方法就是用来实现这些功能的,其源码如下所示,

    private void cancelAcquire(Node node) {
       
        if (node == null)
            return;
        //节点线程置为null
        node.thread = null;

        //从CLH队列中清除已经取消的节点(CANCELLED)
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        Node predNext = pred.next;

        node.waitStatus = Node.CANCELLED;

        //判断如果node是尾部节点,则设置尾部节点
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            int ws;
           	//若不是头节点则直接从CLH队列中清除当前节点
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            //若为头结点,则唤醒后继节点中的线程
            } else {
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

取消获取锁的操作首先将队列中处于CANCELLED状态的节点剔除,然后根据当前节点在CLH队列中的位置进行不同的操作:

  • node在队列尾部,则重新设置CLH队列的尾部节点;
  • node为头结点,唤醒后继节点中的线程;
  • node既不是头结点也不是尾节点,则在CLH中剔除node。

总结

AQS是整个concurrent包的基础,可重入锁、线程池、信号量(Semaphore)等同步工具类都需要借助AQS来完成,了解AQS是深入学习concurrent包的前提。

以上就是Japythonva AQS(AbstractQueuedSynchronizer)源码解析的详细内容,更多关于Java AQS源码的资料请关注我们其它相关文章!

0

精彩评论

暂无评论...
验证码 换一张
取 消