开发者

Redis分布式锁之红锁的实现

开发者 https://www.devze.com 2022-12-08 11:11 出处:网络 作者: 姜秀丽
目录一、问题二、办法三、原理四、实战一、问题 分布式锁,当我们请求一个分布式锁的时候,成功了,但是这时候slave还没有复制我们的锁,masterDown了,我们的应用继续请求锁的时候,会从继任了master的原slave上申请
目录
  • 一、问题
  • 二、办法
  • 三、原理
  • 四、实战

一、问题

分布式锁,当我们请求一个分布式锁的时候,成功了,但是这时候slave还没有复制我们的锁,masterDown了,我们的应用继续请求锁的时候,会从继任了master的原slave上申请,也会成功。

这就会导致,同一个锁被获取了不止一次。

二、办法

Redis中针对此种情况,引入了红锁的概念。

三、原理

用Redis中的多个master实例,来获取锁,只有大多数实例获取到了锁,才算是获取成功。具体的红锁算法分为以下五步:

  • 获取当前的时间(单位是毫秒)。
  • 使用相同的key和随机值在N个节点上请求锁。这里获取锁的尝试时间要远远小于锁的超时时间,防止某个masterDown了,我们还在不断的获取锁,而被阻塞过长的时间。
  • 只有在大多数节点上获取到了锁,而且总的获取时间小于锁的超时时间的情况下,认为锁获取成功了。
  • 如果锁获取成功了,锁的超时时间就是最初的锁超时时间进去获取锁的总耗时时间。
  • 如果锁获取失败了,不管是因为获取成功的节点的数目没有过半,还是因为获取锁的耗时超过了锁的释放时间,都会将已经设置了key的master上的key删除。

四、实战

Redission就实现了红锁算法,使用的步骤如下:

1、引入maven

<!-- JDK 1.8+ compatible -->
<dependency>
   <groupId>org.redisson</groupId>
   <artifactId>redisson</artifactId>
   <version>3.9.0</version>
</dependency> 

2、引入代码

Config config1 = new Config();
config1.useSingleServer().setAddress("redis://172.0.0.1:5378").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient1 = Redisson.create(config1);

Config config2 = new xWzESKGZConfig();
config2.useSingleServer().setAddress("redis://172.0.0.1:5379").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient2 = Redisson.create(config2);

Config config3 = new Config();
config3.useSingleServer().setAddress("redis://172.0.0.1:5380").setPassword("a123456").setDatabase(0);
RedissonClient rediwww.devze.comssonClient3 = Redisson.create(config3);

/**
* 获python取多个 RLock 对象
*/
RLock lock1 = redissonClient1.getLock(lockKey);
RLock lock2 = redissonClient2.getLock(lockKey);
RLock lock3 = redissonClient3.getLock(lockKey);

/**
* 根据多个 RLock 对象构建 RedissonRedLock (最核心的差别就在这里)
*/
RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);

try {
  /**
  * 4.尝试获取锁
  * waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败
  * leaseTime  锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)
  */
  boolean res = redLock.tryLock((long)waitTimeout, (long)leaseTime, TimeUnit.SECONDS);
  if (res) {
    //成功获得锁,在这里处理业务
  }
} catch (Exception e) {
  throw new RuntimeException("aquire lock fail");
}finally{
  //无论如何, 最后都要解锁
  redLock.unlock();
}

3、核心源码

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
  long newLeaseTime = -1;
  if (leaseTime != -1) {
    newLeaseTime = unit.toMillis(waitTime)*2;
  }
 
  long time = System.currentTimeMillis();
  long remainTime = -1;
  if (waitTime != -1) {
    remainTime = unit.toMillis(waitTime);
  }
  long lockWaitTime = calcLockWaitTime(remainTime);
  /**
  * 1. 允许加锁失败节点个数限制(N-(N/2+1))
  */
  int failedLocksLimit = failedLocksLimit();
  /**
  * 2. 遍历所有节点通过EVAL命令执行Lua加锁
  */
  List<RLock> acquiredLocks = new ArrayList<>(locks.size());
  for (ListIterator<RLock> iterator = locks.listIterator(); iterator开发者_mssql2008.hasNext();) {
    RLock lock = iterator.next();
    boolean lockAcquired;
    /**
    * 3.编程客栈对节点尝试加锁
    */
    try {
      if (waitTime == -1 && leaseTime == -1) {
        lockAcquired = lock.tryLock();
      } else {
        long awaitTime = Math.min(lockWaitTime, remainTime);
        lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
      }
    } catch (RedisResponseTimeoutException e) {
      // 如果抛出这类异常,为了防止加锁成功,但是响应失败,需要解锁所有节点
      unlockInner(Arrays.asList(lock));
      lockAcquired = false;
    } catch (Exception e) {
      // 抛出异常表示获取锁失败
      lockAcquired = false;
    }
   
    if (lockAcquired) {
      /**
      *4. 如果获取到锁则添加到已获取锁集合中
      */
      acquiredLocks.add(lock);
    } else {
      /**
      * 5. 计算已经申请锁失败的节点是否已经到达 允许加锁失败节点个数限制 (N-(N/2+1))
      * 如果已经到达, 就认定最终申请锁失败,则没有必要继续从后面的节点申请了
      * 因为 Redlock 算法要求至少N/2+1 个节点都加锁成功,才算最终的锁申请成功
      */
      if (locks.size() - acquiredLocks.size() == failedLocksLimi编程客栈t()) {
        break;
      }

      if (failedLocksLimit == 0) {
        unlockInner(acquiredLocks);
        if (waitTime == -1 && leaseTime == -1) {
          return false;
        }
        failedLocksLimit = failedLocksLimit();
        acquiredLocks.clear();
        // reset iterator
        while (iterator.hASPrevious()) {
          iterator.previous();
        }
      } else {
        failedLocksLimit--;
      }
    }

    /**
    * 6.计算 目前从各个节点获取锁已经消耗的总时间,如果已经等于最大等待时间,则认定最终申请锁失败,返回false
    */
    if (remainTime != -1) {
      remainTime -= System.currentTimeMillis() - time;
      time = System.currentTimeMillis();
      if (remainTime <= 0) {
        unlockInner(acquiredLocks);
        return false;
      }
    }
  }

  if (leaseTime != -1) {
    List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
    for (RLock rLock : acquiredLocks) {
      RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
      futures.add(future);
    }
   
    for (RFuture<Boolean> rFuture : futures) {
      rFuture.syncUninterruptibly();
    }
  }

  /**
  * 7.如果逻辑正常执行完则认为最终申请锁成功,返回true
  */
  return true;
}

到此这篇关于Redis分布式锁之红锁的实现的文章就介绍到这了,更多相关Redis 红锁内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号