前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Golang+Redis分布式互斥锁

Golang+Redis分布式互斥锁

原创
作者头像
lestat
修改2021-05-05 09:20:52
3.1K0
修改2021-05-05 09:20:52
举报
文章被收录于专栏:lestat's bloglestat's blog

引言

假设我们的某个业务会涉及数据更新,同时在实际场景中有较大并发量。流程:读取->修改->保存,在不考虑基于DB层的并发处理情况下,这种场景可能对部分数据造成不可预期的执行结果,此时可以考虑使用分布式锁来解决该问题

需要解决的问题

  1. 锁的误解除
  2. 业务执行超时导致并发
  3. 重试机制
  4. GETDEL非原子性

代码

目录结构:

代码语言:txt
复制
│  main.go
│
└─demo
        lock.go

lock.go:

代码语言:txt
复制
package demo

import (
	"context"
	"fmt"
	"github.com/go-redis/redis/v8"
	"math/rand"
	"time"
)

// 重试次数
var retryTimes = 5

// 重试频率
var retryInterval = time.Millisecond * 50

var rdb = redis.NewClient(&redis.Options{
	Addr:     "localhost:6379",
	Password: "", // no password set
	DB:       0,  // use default DB
})

// 锁的默认过期时间
var expiration time.Duration

// 模拟分布式业务加锁场景
func MockTest(tag string) {
	var ctx, cancel = context.WithCancel(context.Background())

	defer func() {
		// 停止goroutine
		cancel()
	}()

	// 随机value
	lockV := getRandValue()

	lockK := "EXAMPLE_LOCK"

	// 默认过期时间
	expiration = time.Millisecond * 200

	fmt.Println(tag + "尝试加锁")

	set, err := rdb.SetNX(ctx, lockK, lockV, expiration).Result()

	if err != nil {
		panic(err.Error())
	}

	// 加锁失败,重试
	if set == false && retry(ctx, rdb, lockK, lockV, expiration, tag) == false {
		fmt.Println(tag + " server unavailable, try again later")
		return
	}

	fmt.Println(tag + "成功加锁")

	// 加锁成功,新增守护线程
	go watchDog(ctx, rdb, lockK, expiration, tag)

	// 处理业务(通过随机时间延迟模拟)
	fmt.Println(tag + "等待业务处理完成...")
	time.Sleep(getRandDuration())

	// 业务处理完成
	// 释放锁
	val := delByKeyWhenValueEquals(ctx, rdb, lockK, lockV)
	fmt.Println(tag+"释放结果:", val)
}

// 释放锁
func delByKeyWhenValueEquals(ctx context.Context, rdb *redis.Client, key string, value interface{}) bool {
	lua := `
-- 如果当前值与锁值一致,删除key
if redis.call('GET', KEYS[1]) == ARGV[1] then
	return redis.call('DEL', KEYS[1])
else
	return 0
end
`
	scriptKeys := []string{key}

	val, err := rdb.Eval(ctx, lua, scriptKeys, value).Result()
	if err != nil {
		panic(err.Error())
	}

	return val == int64(1)
}

// 生成随机时间
func getRandDuration() time.Duration {
	rand.Seed(time.Now().UnixNano())
	min := 50
	max := 100
	return time.Duration(rand.Intn(max-min)+min) * time.Millisecond
}

// 生成随机值
func getRandValue() int {
	rand.Seed(time.Now().UnixNano())
	return rand.Int()
}

// 守护线程
func watchDog(ctx context.Context, rdb *redis.Client, key string, expiration time.Duration, tag string) {
	for {
		select {
		// 业务完成
		case <-ctx.Done():
			fmt.Printf("%s任务完成,关闭%s的自动续期\n", tag, key)
			return
			// 业务未完成
		default:
			// 自动续期
			rdb.PExpire(ctx, key, expiration)
			// 继续等待
			time.Sleep(expiration / 2)
		}
	}
}

// 重试
func retry(ctx context.Context, rdb *redis.Client, key string, value interface{}, expiration time.Duration, tag string) bool {
	i := 1
	for i <= retryTimes {
		fmt.Printf(tag+"第%d次尝试加锁中...\n", i)
		set, err := rdb.SetNX(ctx, key, value, expiration).Result()

		if err != nil {
			panic(err.Error())
		}

		if set == true {
			return true
		}

		time.Sleep(retryInterval)
		i++
	}
	return false
}

流程说明

假设MockTest方法就是业务处理方法

  1. 初始化context用于控制守护线程的退出
  2. 设置随机值尝试加锁(随机值在释放锁时可避免误释放)
  3. 如果加锁不成功,尝试重试,重试机制根据业务而定,重试失败处理根据业务而定
  4. 成功加锁后开启一个守护线程(watchDog),用于持续刷新锁的过期时间,保证在业务执行过程中锁不会过期
  5. 模拟业务处理随机耗时
  6. 业务处理完成后释放锁(lua处理保证原子性,并对比value避免误释放)
  7. 通过cancel关闭守护线程(watchDog),避免死锁

应对场景

  1. 线程获取到锁后异常终止,锁会在expire到期后自动释放
  2. 线程执行时间超出锁的默认expire,通过watchDog自动续期,避免该情况发生

测试

main.go:

代码语言:txt
复制
package main

import (
	"play/demo"
	"time"
)

func main() {
	go demo.MockTest("A")
	go demo.MockTest("B")
	go demo.MockTest("C")
	go demo.MockTest("D")
	go demo.MockTest("E")
	// 用于测试goroutine接收到ctx.Done()信号后的打印
	time.Sleep(time.Second * 2)
}

结果:

代码语言:txt
复制
$ go run main.go
A尝试加锁
D尝试加锁
E尝试加锁
B尝试加锁
C尝试加锁
D成功加锁
D等待业务处理完成...
B第1次尝试加锁中...
E第1次尝试加锁中...
A第1次尝试加锁中...
C第1次尝试加锁中...
B第2次尝试加锁中...
D释放结果: true
B成功加锁
E第2次尝试加锁中...
B等待业务处理完成...
C第2次尝试加锁中...
A第2次尝试加锁中...
D任务完成,关闭EXAMPLE_LOCK的自动续期
A第3次尝试加锁中...
C第3次尝试加锁中...
E第3次尝试加锁中...
B释放结果: true
A成功加锁
A等待业务处理完成...
B任务完成,关闭EXAMPLE_LOCK的自动续期
E第4次尝试加锁中...
C第4次尝试加锁中...
A释放结果: true
A任务完成,关闭EXAMPLE_LOCK的自动续期
C第5次尝试加锁中...
E第5次尝试加锁中...
C成功加锁
C等待业务处理完成...
E server unavailable, try again later
C释放结果: true
C任务完成,关闭EXAMPLE_LOCK的自动续期

偷懒就没写单元测试了??

博客原文

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 引言
  • 需要解决的问题
  • 代码
  • 流程说明
  • 应对场景
  • 测试
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com