开发者

ReentrantLock从源码解析Java多线程同步学习

开发者 https://www.devze.com 2023-04-20 10:33 出处:网络 作者: 码下客
目录前言管程管程模型MESA模型主要特点AQS共享变量资源访问方式主要方法队列node节点等待状态ReentrantLock源码分析实例化ReentrantLock加锁A线程加锁成功B线程尝试加锁释放锁总结前言
目录
  • 前言
  • 管程
    • 管程模型
  • MESA模型
    • 主要特点
  • AQS
    • 共享变量
    • 资源访问方式
    • 主要方法
    • 队列
    • node节点等待状态
  • ReentrantLock源码分析
    • 实例化ReentrantLock
    • 加锁
    • A线程加锁成功
    • B线程尝试加锁
  • 释放锁
    • 总结

      前言

      如今多线程编程已成为了现代软件开发中的重要部分,而并编程客栈发编程中的线程同步问题更是一道难以逾越的坎。在Java语言中,synchronized是最基本的同步机制,但它也存在着许多问题,比如可重入性不足、死锁等等。为了解决这些问题,Java提供了更加高级的同步机制——ReentrantLock。

      管程

      管程(Monitor)是一种用于实现多线程同步的抽象数据类型,它可以用来协调不同线程之间的互斥和同步访问共享资源。通俗地说,管程就像一个门卫,控制着进入某个共享资源区域的线程数量和时间,以避免多个线程同时访问导致的数据竞争和混乱。

      管程模型

      • Mesa管程模型:由美国计算机科学家Dijkstra提出,是最流行的管程模型之一。在Mesa管程模型中,每个管程也有一个条件变量和等待队列,但与Hoare管程不同的是,当一个线程请求进入管程时,如果条件不满足,该线程并不会立即被阻塞,而是继续执行后续操作,直到该线程主动放弃锁资源或者其他线程唤醒它。
      • Hoare管程模型:由英国计算机科学家C.A.R. Hoare提出,是最早的管程模型之一。在Hoare管程模型中,每个管程都有一个条件变量和一个等待队列,当一个线程请求进入管程时,如果条件不满足,该线程就会被阻塞并加入等待队列,直到条件满足后才被唤醒。
      • Brinch Hansen管程模型:由丹麦计算机科学家Per Brinch Hansen提出,是一种改进的管程模型。在Brinch Hansen管程模型中,每个管程也有一个条件变量和等待队列,但与其他管程模型不同的是,它允许多个线程同时在管程中等待,并且不需要像Hoare管程那样每次只唤醒一个等待线程。

      在Java中,采用的是基于Mesa管程模型实现的管程机制。具体地,Java中的synchronized关键字就是基于Mesa管程模型实现的,包括Java中的AbstractQueuedSynchronizer(AQS)可以被看作是一种基于管程模型实现的同步框架。

      MESA模型

      主要特点

      • 互斥访问

      MESA模型采用了互斥访问的机制,即同一时刻只能有一个线程进入管程执行代码。

      • 条件变量

      MESA模型还引入了条件变量的概念,用于实现线程间的等待和唤醒操作。条件变量提供了一种机制,使得线程可以在等待某个条件成立时挂起,并在条件成立时被唤醒。

      • 等待队列

      MESA模型使用等待队列来维护处于等待状态的线程,这些线程都在等待条件变量成立。等待队列由一个或多个条件变量组成,每个条件变量都有自己的等待队列。

      • 原子操作

      MESA模型要求管程中的所有操作都是原子操作,即一旦进入管程,就不能被中断,直到操作执行完毕。

      ReentrantLock从源码解析Java多线程同步学习

      AQS

      在讲ReentrantLock之前先说一下AQS,AQS(AbstractQueuedSynchronizer)是Java中的一个同步器,它是许多同步类(如ReentrantLock、Semaphore、CountDownLatch等)的基础。AQS提供了一种实现同步操作的框架,其中包括独占模式和共享模式,以及一个等待队列来管理线程的等待和唤醒。AQS也借鉴了Mesa模型的思想。

      共享变量

      AQS内部维护了属性volatile int state表示资源的可用状态

      state三种访问方式:

      • getState()
      • setState()
      • compareAndSetState()

      资源访问方式

      Exclusive-独占,只有一个线程能执行,如ReentrantLock

      Share-共享,多个线程可以同时执行,如Semaphore/CountDownLatch

      主要方法

      • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
      • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
      • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
      • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
      • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false

      队列

      • 同步等待队列: 主要用于维护获取锁失败时入队的线程。
      • 条件等待队列: 调用await()的时候会释放锁,然后线程会加入到条件队列,调用signal()唤醒的时候会把条件队列中的线程节点移动到同步队列中,等待再次获得锁。

      node节点等待状态

      • 值为0,初始化状态,表示当前节点在sync队列中,等待着获取锁。
      • CANCELLED,值为1,表示当前的线程被取消;
      • SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;
      • CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;
      • PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;

      ReentrantLock源码分析

      在ReentrantLock中有一个内部类Sync会继承 AQS然后将同步器所有调用都映射到Sync对应的方法。

      实例化ReentrantLock

      /**
       * Creates an instance of {@code ReentrantLock}.
       * This is equivalent to using {@code ReentrantLock(false)}.
       */
      public ReentrantLock() {
          sync = new NonfairSync();
      }
      /**
       * Creates an instance of {@code ReentrantLock} with the
       * given fairness policy.
       *
       * @param fair {@code true} if this lock should use a fair ordering policy
       */
      public ReentrantLock(boolean fair) {
          sync = fair ? new FairSync() : new NonfairSync();
      }
      

      ReentrantLock还提供了一个传布尔值的实例化方式,这个传true用来创建一个公平锁的,默认是创建非公平锁。非公平锁的话 sync是用NonfairSync来进行实例化,公平锁sync是用FairSync来进行实例化。

      加锁

      现在假设有AB两个线程来竞争锁

      A线程加锁成功

      static final class NonfairSync extends Sync {
          private static final long serialVersionUID = 7316153563782823691L;
      
          /**
           * Performs lock.  Try immediate barge, backing up to normal
           * acquire on failure.
           */
          final void lock() {
              //CAS修改state状态
              if (compareAndSetState(0, 1))
                 //修改成功设置exclusiveOwnerThread
                  setExclusiveOwnerThread(Thread.currentThread());
              else
                 //尝试获取资源
                  acquire(1);
          }
      
          protected final boolean tryAcquire(int acquires) {
              return nonfairTryAcquire(acquires);
          }
      }
      

      假定A线程先CAS修改成功,他会设置exclusiveOwnerThread为A线程

      ReentrantLock从源码解析Java多线程同步学习

      B线程尝试加锁

      public final void acquire(int arg) {
          if (!tryAcquire(arg) &&
              acquireQueued(addwaiter(Node.EXCLUSIVE), arg))
              selfInterrupt();
      }
      

      我们先看tryAcquire()方法,这里体现出了他的可重入性。

      final boolean nonfairTryAcquire(int acquires) {
          final Thread current = Thread.currentThread();
          //获取当前资源标识
          int c = getState();
          if (c == 0) {
              //如果资源没被占有CAS尝试加锁
              if (compareAndSetState(0, acquires)) {
                  //修改成功设置exclusiveOwnerThread
                  setExclusiveOwnerThread(current);
                  return true;
              }
          }
          //资源被占有要判断占有资源的线程是不是当前线程,加锁成功设置的exclusiveOwnerThread值在这里就派上了用处
          else if (current == getExclusiveOwnerThread()) {
              //这下面就是将重入次数设置到资源标识里
              int nextc = c + acquires;
              if (nextc < 0) // overflow
                  throw new Error("Maximum lock count exceeded");
              setState(nextc);
              return true;
          }
          return false;
      }
      

      根据上面源码我们可以看出B线程尝试加锁是失败的,接下来看尝试加锁失败后的方法acquireQueued(addWaiter(Node.EXCLUSIVE), arg),该方法实现分为两个部分:

      • addWaiter(Node.EXCLUSIVE):该方法会将当前线程加入到等待队列(即 Sync 中的 queue)的尾部,并返回该节点。这个节点的模式是独占模式(Node.EXCLUSIVE),表示当前线程想要获取独占锁。
      • acquireQueued(Node node, int arg):该方法是一个循环方法,用于等待和获取锁。

      我们先解析addWaiter

      private Node addWaiter(Node mode) {
          //构建一个当前线程的node节点 这里prev 和 next 都为null
          Node node = new Node(Thread.currentThread(), mode);
          // 指向双向链表的尾节点的引用
          Node pred = tail;
          //B线程进来目前还未构建任何队列这里肯定是空的
          if (pred != null) {
              //如果已经构建过队列会把当前线程的node节点的上一个node节点指向tail尾节点
              node.prev = pred;
              //CAS操作把当前线程的node节点设置为新的tail尾节点
              if (compareAndSetTail(pred, node)) {
                  //把旧的tail尾节点的下一个node节点指向当前线程的node节点
                  pred.next = node;
                  return node;
              }
          }
          //尾节点为空执行
          enq(node);
          return node;
      }
      
      private Node enq(final Node node) {
          //死循环当tail 尾节点不为空才会跳出
          for (;;) {
              Node t = tail;
              if (t == null) { // Must initialize
                  //用CAS构建出一个head空的noandroidde节点
                  if (compareAndSetHead(new Node()))
                  //将当前空的node节点交给尾节点(下一次循环就会走else分支)
                      tail = head;
              } else {
                  //把我们addWaiter中创建的node节点的prev指向了当前线程node节点
                  node.prev = t;
                  //将tail尾节点更改为当前线程的node节点
                  if (compareAndSetTail(t, node)) {
                      //将t的下一个节点指向当前线程创建的node节点
                      t.next = node;
                      return t;
                  }
              }
          }
      }
      

      执行完这里初步的一个等待队列就构建好了

      ReentrantLock从源码解析Java多线程同步学习

      解析完addWaiter我们再来解析acquireQueued,addWaiter执行完后的结果会返回一个双向列表的node节点

      final boolean acquireQueued(final Node node, int arg) {
          boolean failed = true;
          try {
              //中断标志位
              boolean interrupted = false;
              for (;;) {
                  //获取当前线程node节点的上一个节点
                  final Node p = node.predecessor();
                  //如果上一个节点就是head节点说明当前线程其实是处在队列第一位然后就会再次尝试加锁
                  if (p =js= head && tryAcquire(arg)) {
                      setHead(node);
                      p.next = null; // help GC
                      failed = false;
                      return interrupted;
                  }
                  //这里是重点 这个方法来判断当前线程是否应该进入等待状态
                  if (shouldParkAfterFailedAcquire(p, node) &&
                      //调用LockSupport.park(this)阻塞了当前线程等待被唤醒
                      parkAndCheckInterrupt())
                      interrupted = true;
              }
          } finally {
              if (failed)
                  cancelAcquire(node);
          }
      }
      
      private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
          //获取上一个节点的等待状态
          int ws = pred.waitStatus;
          if (ws == Node.SIGNAL)//表示当前节点的后继节点包含的线程需要运行
              /*
               * This node has already set status asking a release
               * to signal it, so it can safely park.
               */
              return true;
          if (ws > 0) {//当前线程被取消
              /*
               * Predecessor was cancelled. Skip over predecessors and
               * indicate retry.
               */
              do {
                  node.prev = pred = pred.prev;
              } while (pred.waitStatus > 0);
              pred.next = node;
          } else {
              /*
               * waitStatus must be 0 or PROPAGATE.  Indicate that we
               * need a signal, but don't park yet.  Caller will need to
               * retry to make sure it cannot acquire before parking.
               */
              compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//设置等待状态为-1
          }
          return false;
      }
      

      运行到parkAndCheckInterrupt()B线程就会被阻塞了,后续的逻辑我们在解锁操作unlock之后再继续说

      ReentrantLock从源码解析Java多线程同步学习

      释放锁

      public void unlock() {
          sync.release(1);
      }
      
      public final boolean release(int arg) {
         //尝试释放锁
          if (tryRelease(arg)) {
              Node h = head;
              if (h != null && h.waitStatus != 0)
                  //head节点不为空并且等待状态不是0就去进行unpark操作
                  unparkSuccessor(h);
              return true;
          }
          return false;
      }
      
      private void unparkSuccessor(Node node) {
          /*
           * If status is negative (i.e., possibly needing signal) try
           * to clear in anticip编程ation of signalling.  It is OK if this
           * fails or if status is changed by waiting thread.
           */
          //head节点等待状态
          int ws = node.waitStatus;
          //
          if (ws < 0)
             //将头节点的等待状态修改为0
              compareAndSetWaitStatus(node, ws, 0);
      
          /*
           * Thread to unpark is held in successor, which is normally
           * just the next node.  But if cancelled or apparently null,
           * traverse backwards from tail to find the actual
           * non-cancelled successor.
           */
          //获取头节点的下一个节点(也就是B节点)
          Node s = node.next;
          if (s == null || s.waitStatus > 0) {//节点为空或者线程处于取消状态
              s = null;
              //从尾节点往上找符合条件的节点
              for (Node t = tail; t != null && t != node; t = t.prev)
                  if (t.waitStatus <= 0)
                      s = t;
          }
          if (s != null)
              //对该线程进行unpark唤醒(B节点)
              LockSupport.unpark(s.thread);
      }
      

      ReentrantLock从源码解析Java多线程同步学习

      唤醒之后我们的B线程就能继续往下走了,我们继续看刚刚的acquireQu开发者_开发教程eued()方法

      final boolean acquireQueued(final Node node, int arg) {
          boolean failed = true;
          try {
              boolean interrupted = false;
              for (;;) {
               javascript   final Node p = node.predecessor();
                  //这里尝试获取锁由于A线程释放了锁这里是肯定获取成功的
                  if (p == head && tryAcquire(arg)) {
                      //把head设置为当前节点(也就是往前移一位,并且把上一个节点指向指为null)
                      setHead(node);
                      //把刚刚的上一个节点也指向为null (这里他就没引用了会被GC回收掉)
                      p.next = null; // help GC
                      failed = false;
                      return interrupted;
                  }
                  if (shouldParkAfterFailedAcquire(p, node) &&
                      //刚刚在这里阻塞现在被唤醒
                      parkAndCheckInterrupt())
                      //设置标志中断位为true 然后开始下一次循环
                      interrupted = true;
              }
          } finally {
              if (failed)
                  cancelAcquire(node);
          }
      }
      

      总结

      • 锁的实现方式:ReentrantLock内部通过维护一个state变量来表示锁的状态,其中高16位表示持有锁的线程ID,低16位表示重入次数。使用CAS操作来获取锁,如果当前锁未被持有,则将state的高16位设置为当前线程ID,并将低16位设置为1,表示重入次数为1;如果当前锁已经被持有,则判断持有锁的线程是否为当前线程,如果是,则将state的低16位加1表示重入,如果不是,则进入等待队列。
      • 等待队列:ReentrantLock中的等待队列采用了CLH队列的实现方式,每个等待线程会被封装成一个Node节点,节点中维护了前继节点、后继节点和等待状态等信息。当一个线程需要等待锁时,会将自己封装成一个Node节点插入到等待队列的尾部,并在自旋等待时自动阻塞。
      • 公平锁与非公平锁:ReentrantLock可以通过构造函数中的fair参数来指定锁的公平性,当fair为true时表示该锁是公平锁,即等待队列中的线程会按照先进先出的顺序获取锁;当fair为false时表示该锁是非公平锁,即等待队列中的线程会按照随机顺序获取锁。
      • 锁的释放:ReentrantLock中的锁释放采用了state变量的递减来实现,当一个线程释放锁时,会将state的低16位减1,如果减1后低16位变为0,则表示当前线程已经完全释放了锁,此时会将高16位清零,表示锁变为了可获取状态,等待队列中的线程可以继续竞争锁。

      以上就是ReentrantLock从源码解析Java多线程同步学习的详细内容,更多关于Java多线程ReentrantLock的资料请关注我们其它相关文章!

      0

      精彩评论

      暂无评论...
      验证码 换一张
      取 消

      关注公众号