前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >基于redis实现的分布式锁

基于redis实现的分布式锁

原创
作者头像
一个风轻云淡
发布2023-12-05 21:03:17
3690
发布2023-12-05 21:03:17
举报
文章被收录于专栏:java学习javajava学习java

基本实现

借助于redis中的命令setnx(key, value),key不存在就新增,存在就什么都不做。同时有多个客户端发 送setnx命令,只有一个客户端可以成功,返回1(true);其他的客户端返回0(false)。

添加描述

  • 1. 多个客户端同时获取锁(setnx)
  • 2. 获取成功,执行业务逻辑,执行完成释放锁(del)
  • 3. 其他客户端等待重试

改造StockService方法:

代码语言:javascript
复制
@Service
public class StockService {
 ? ?@Autowired
 ? ?private StockMapper stockMapper;
 ? ?@Autowired
 ? ?private LockMapper lockMapper;
 ? ?@Autowired
 ? ?private StringRedisTemplate redisTemplate;
 ? ?public void checkAndLock() {
 ? ? ? ?// 加锁,获取锁失败重试
 ? ? ? ?while (!this.redisTemplate.opsForValue().setIfAbsent("lock", 

"xxx")){
 ? ? ? ? ? ?try {
 ? ? ? ? ? ? ? ?Thread.sleep(100);
 ? ? ? ? ? } catch (InterruptedException e) {
 ? ? ? ? ? ? ? ?e.printStackTrace();
 ? ? ? ? ? }
 ? ? ? }
 ? ? ? ?// 先查询库存是否充足
 ? ? ? ?Stock stock = this.stockMapper.selectById(1L);
 ? ? ? ?// 再减库存
 ? ? ? ?if (stock != null && stock.getCount() > 0){
 ? ? ? ? ? ?stock.setCount(stock.getCount() - 1);
 ? ? ? ? ? ?this.stockMapper.updateById(stock);
 ? ? ? }
 ? ? ? ?// 释放锁
 ? ? ? ?this.redisTemplate.delete("lock");
 ? }
}

其中,加锁:

代码语言:javascript
复制
// 加锁,获取锁失败重试
while (!this.redisTemplate.opsForValue().setIfAbsent("lock", "xxx")){
 ? ?try {
 ? ? ? ?Thread.sleep(100);
 ? } catch (InterruptedException e) {
 ? ? ? ?e.printStackTrace();
 ? }
}

解锁:

代码语言:javascript
复制
// 释放锁

this.redisTemplate.delete("lock");

使用Jmeter压力测试如下:

?查看mysql数据库:

防死锁

?解决:给锁设置过期时间,自动释放锁。 设置过期时间两种方式:

1. 通过expire设置过期时间(缺乏原子性:如果在setnx和expire之间出现异常,锁也无法释放)

2. 使用set指令设置过期时间:set key value ex 3 nx(既达到setnx的效果,又设置了过期时间)

压力测试肯定也没有问题。

问题:可能会释放其他服务器的锁。 场景:如果业务逻辑的执行时间是7s。执行流程如下

1. index1业务逻辑没执行完,3秒后锁被自动释放。

2. index2获取到锁,执行业务逻辑,3秒后锁被自动释放。

3. index3获取到锁,执行业务逻辑

4. index1业务逻辑执行完成,开始调用del释放锁,这时释放的是index3的锁,导致index3的业务只 执行1s就被别人释放。 最终等于没锁的情况。

解决:setnx获取锁时,设置一个指定的唯一值(例如:uuid);释放前获取这个值,判断是否自己的 锁

防误删

实现如下:

问题:删除操作缺乏原子性。 场景:

1. index1执行删除时,查询到的lock值确实和uuid相等

2. index1执行删除前,lock刚好过期时间已到,被redis自动释放

3. index2获取了lock 4. index1执行删除,此时会把index2的lock删除

解决方案:没有一个命令可以同时做到判断 + 删除,所有只能通过其他方式实现(LUA脚本)

使用lua保证删除原子性

删除LUA脚本:

代码语言:javascript
复制
if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', 
KEYS[1]) else return 0 end

代码实现:

代码语言:javascript
复制
public void checkAndLock() {
 ? ?// 加锁,获取锁失败重试
 ? ?String uuid = UUID.randomUUID().toString();
 ? ?while (!this.redisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, 

TimeUnit.SECONDS)){
 ? ? ? ?try {
 ? ? ? ? ? ?Thread.sleep(50);
 ? ? ? } catch (InterruptedException e) {
 ? ? ? ? ? ?e.printStackTrace();
 ? ? ? }
 ? }
 ? ?// 先查询库存是否充足
 ? ?Stock stock = this.stockMapper.selectById(1L);
 ? ?// 再减库存
 ? ?if (stock != null && stock.getCount() > 0){
 ? ? ? ?stock.setCount(stock.getCount() - 1);
 ? ? ? ?this.stockMapper.updateById(stock);
 ? }
 ? ?// 释放锁
 ? ?String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return 
redis.call('del', KEYS[1]) else return 0 end";
 ? ?this.redisTemplate.execute(new DefaultRedisScript<>(script, 

Long.class), Arrays.asList("lock"), uuid);
}

?压力测试:

可重入锁?

由于上述加锁命令使用了 SETNX ,一旦键存在就无法再设置成功,这就导致后续同一线程内继续加 锁,将会加锁失败。当一个线程执行一段代码成功获取锁之后,继续执行时,又遇到加锁的子任务代 码,可重入性就保证线程能继续执行,而不可重入就是需要等待锁释放之后,再次获取锁成功,才能继 续往下执行。

用一段 Java 代码解释可重入:

代码语言:javascript
复制
public synchronized void a() {
 ? ?b();
}

public synchronized void b() {
 ? ?// pass
}

假设 X 线程在 a 方法获取锁之后,继续执行 b 方法,如果此时不可重入,线程就必须等待锁释放,再次争抢锁。

锁明明是被 X 线程拥有,却还需要等待自己释放锁,然后再去抢锁,这看起来就很奇怪,我释放我自己~

可重入性就可以解决这个尴尬的问题,当线程拥有锁之后,往后再遇到加锁方法,直接将加锁次数加 1,然后再执行方法逻辑。退出加锁方法之后,加锁次数再减 1,当加锁次数为 0 时,锁才被真正的释 放。 可以看到可重入锁最大特性就是计数,计算加锁的次数。所以当可重入锁需要在分布式环境实现时,我们也就需要统计加锁次数。

解决方案:redis + Hash

加锁脚本

Redis 提供了 Hash (哈希表)这种可以存储键值对数据结构。所以我们可以使用 Redis Hash 存储的 锁的重入次数,然后利用 lua 脚本判断逻辑。

代码语言:javascript
复制
if (redis.call('exists', KEYS[1]) == 0 or 
    redis.call('hexists', KEYS[1], ARGV[1]) == 1) 
then
 ? ?redis.call('hincrby', KEYS[1], ARGV[1], 1);
 ? ?redis.call('expire', KEYS[1], ARGV[2]);
 ? ?return 1;
else
 return 0;
end

假设值为:KEYS:[lock], ARGV[uuid, expire]如果锁不存在或者这是自己的锁,就通过hincrby(不存在就新增并加1,存在就加1)获取锁或者锁次 数加1。

解锁脚本
代码语言:javascript
复制
-- 判断 hash set 可重入 key 的值是否等于 0
-- 如果为 nil 代表 自己的锁已不存在,在尝试解其他线程的锁,解锁失败
-- 如果为 0 代表 可重入次数被减 1
-- 如果为 1 代表 该可重入 key 解锁成功
if(redis.call('hexists', KEYS[1], ARGV[1]) == 0) then 
 ? ?return nil; 
elseif(redis.call('hincrby', KEYS[1], ARGV[1], -1) > 0) then 
 ? ?return 0; 
else 
 ? ?redis.call('del', KEYS[1]); 
 ? ?return 1; 
end;

这里之所以没有跟加锁一样使用 Boolean ,这是因为解锁 lua 脚本中,三个返回值含义如下:

1 代表解锁成功,锁被释放

0 代表可重入次数被减 1

null 代表其他线程尝试解锁,解锁失败

代码实现

由于加解锁代码量相对较多,这里可以封装成一个工具类:

?具体实现:

代码语言:javascript
复制
public class RedisDistributeLock{


    private StringRedisTemplate redisTemplate;

    //线程局部变量,可以在线程内共享参数
    private String lockName;
    private String static uuid;
    private Integer expire = 30;
    private static final ThreadLocal<String> THREAD_LOCAL = new ThreadLocal<>();

    public DistributedRedisLock(StringRedisTemplate redisTemplate, String lockName) {
        this.redisTemplate = redisTemplate;
        this.lockName = lockName;
        this.uuid = THREAD_LOCAL.get();
        if (StringUtils.isBlank(uuid)) {
            this.uuid = UUID.randomUUID().toString();
            THREAD_LOCAL.set(uuid);
        }
    }

    public void lock() {
        this.lock(expire);
    }

    public void lock(Integer expire) {
        this.expire = expire;
        String script = "if (redis.call('exists', KEYS[1]) == 0 or" +
                "redis.call('hexists', KEYS[1], ARGV[1]) == 1)" +
                "        then" +
                "  redis.call('hincrby', KEYS[1], ARGV[1], 1);" +
                "  redis.call('expire', KEYS[1], ARGV[2]);" +
                "  return 1;" +
                "else" +
                "        return 0;" +
                " end";
        if (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class),
                Arrays.asList(lockName), uuid, expire.toString())) {
            try {
                Thread.sleep(60);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //没有获取到锁重试
            lock(expire);
        }
    }

    public void unlock() {
        String script = "if(redis.call('hexists', KEYS[1], ARGV[1]) == 0) then" +
                "  return nil; " +
                "elseif(redis.call('hincrby', KEYS[1], ARGV[1], -1) > 0) then" +
                "  return 0; " +
                "else" +
                "  redis.call('del', KEYS[1]);" +
                "  return 1;" +
                "end;";
        //如果返回值没有使用Boolean,Spring-data-redis 进行类型转换时将会把 null
        //转为 false,这就会影响我们逻辑判断
        //所以返回类型只好使用 Long:null-解锁失败;0-重入次数减1;1-解锁成功。
        Long result = this.redisTemplate.execute(new DefaultRedisScript<>
                (script, Long.class), Arrays.asList(lockName), uuid);
        // 如果未返回值,代表尝试解其他线程的锁
        if (result == null) {
            throw new IllegalMonitorStateException("attempt to unlock lock, not locked by lockName: " + lockName + " with request: " + uuid);
        } else if (result == 1) {
            THREAD_LOCAL.remove();
        }
    }

}
使用及测试?

在业务代码中使用:

代码语言:javascript
复制
public void checkAndLock() {
 ? ?// 加锁,获取锁失败重试
 ? ?RedisDistributeLock lock = new RedisDistributeLock(this.redisTemplate, 
"lock");
 ? ?lock.lock();
 ? ?// 先查询库存是否充足
 ? ?Stock stock = this.stockMapper.selectById(1L);
 ? ?// 再减库存
 ? ?if (stock != null && stock.getCount() > 0){
 ? ? ? ?stock.setCount(stock.getCount() - 1);
 ? ? ? ?this.stockMapper.updateById(stock);
 ? }
 ? ?// this.testSubLock();
 ? ?// 释放锁
 ? ?lock.unlock();
}

?测试:

?测试可重入性:

?自动续期

lua脚本:

代码语言:javascript
复制
if(redis.call('hexists', KEYS[1], ARGV[1]) == 1) then 
 ? ?redis.call('expire', KEYS[1], ARGV[2]); 
 ? ?return 1; 
else 
 ? ?return 0; 
end

?在RedisDistributeLock中添加renewExpire方法:

代码语言:javascript
复制
    private static final Timer TIMER = new Timer();

    /**
     * 开启定时器,自动续期
     */
    private void renewExpire() {
        String script = "if(redis.call('hexists', KEYS[1], ARGV[1]) == 1) then " +
                "redis.call('expire', KEYS[1], ARGV[2]); " +
                "return 1; " +
                "else " +
                "return 0; end";
        TIMER.schedule(new TimerTask() {
            @Override
            public void run() {
                //如果uuid为空,则终止定时任务
                if (StringUtils.isNotBlank(uuid)) {
                    redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class),
                            Arrays.asList(lockName), RedisDistributeLock.this.uuid,
                            expire.toString());
                    renewExpire();
                }
            }
        },expire * 1000 / 3);
    }

?在lock方法中使用:

?在unlock方法中添加红框中的代码:

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 基本实现
  • 防误删
  • 使用lua保证删除原子性
  • 可重入锁?
    • 加锁脚本
      • 解锁脚本
        • 代码实现
          • 使用及测试?
          • ?自动续期
          相关产品与服务
          云数据库 Redis
          腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
          http://www.vxiaotou.com