周末了,这周遇到个问题当时没想明白,周末整理下
题目有点绕口 在现实的项目中这么搞的也不常见,里面牵涉多个知识点,整理下就当学习了。
程序需求:
1:接收任务,从异步消息队列里面监控接收最新的任务消息
2:处理任务,每个任务耗时可能不定
我们常规的做法就是开启一个长监听串行化来一个执行一个,实在不行就多开几个,这种呢人工干预比较重,有时候还盯着不是太好。
程序方案:
1:异步接受消息 ,开启一个协程接受消息 这个不用多开接收消息不会成为瓶颈
2:异步处理消息,开启协程异步处理对应的消息,这里有点要注意是一个消息就开一个处理协程 还是多个消息开启,是值得思考的。
3:批量处理,多个消息开启一个处理协程,防止开启过多的协程,
4:超时处理,如果长时间没有达到批量处理的数量限制,那么也要及时处理
5:限制过多的协程,这个其实没有什么必要,因为go所谓百万协程性能但是既然搞了这个例子 那就完善下吧
代码实现:
package main
import (
"errors"
"fmt"
"math/rand"
"runtime"
"strconv"
"time"
)
var msgChanLimit = 20 //消息管道大小限制
var handleId int //协程任务自定义id 用来区分 查看对应的任务批次 每处理一次 ++
/*
periodType 消息产生时间
1 1秒一个信息会走到超时处理模块;
2 1毫秒一个信息 会一直走正常处理模块;
3 随机0秒到 1秒 模拟都会触发的情况
*/
var periodType = 3
var batchNum = 6 // 最多堆积多少未处理信息 进行一批次处理
var batchTime = 3 // 在未满消息 最长多久 进行一批次处理
var maxGoroutines = 10 //最大的协程数量 这个其实感觉意义不大 但是还是搞一个 边界预防吧
//接受消息
func getMsg(msgChan chan string) {
msgId := 0
for { //循环接收消息
msgId++
msg := "Msg id:" + strconv.Itoa(msgId) + "; AddTime:" + time.Now().Format("2006-01-02 15:04:05")
msgChan <- msg
switch periodType {
case 1:
time.Sleep(1 * time.Second) // 秒
case 2:
time.Sleep(1 * time.Millisecond) // 毫秒
case 3:
time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond) // 随机
}
}
}
// 处理消息
func handleMsg(model int, handleId int, msgSet []string, guard chan struct{}) {
for i, v := range msgSet {
fmt.Println("HandleId:", handleId, ";Model", model, ">>> Idx:"+strconv.Itoa(i)+";Content:"+v)
time.Sleep(1500 * time.Millisecond) //模拟具体处理消息是有耗时的 间隔越大 那么多个后台会有越多的挂起的处理
}
<-guard //释放一个限量
}
//无阻塞去接受消息
func unBlockRead(ch chan string) (msg string, err error) {
select {
case msg = <-ch:
return msg, nil
case <-time.After(time.Microsecond):
return "", errors.New("nil")
}
}
func main() {
guard := make(chan struct{}, maxGoroutines) //守护协程数量限制
msgChan := make(chan string, msgChanLimit) //接受消息channel大小
go getMsg(msgChan) // 开始接收消息
msgSet := make([]string, 0) // 临时存放接收到的消息集合
step := 0 //秒计数器 对应秒
for { //主逻辑处理 开始处理
if msg, err := unBlockRead(msgChan); err == nil { //接收到消息
msgSet = append(msgSet, msg)
if len(msgSet) == batchNum { //达到处理数量
handleId++
guard <- struct{}{}
go handleMsg(1, handleId, msgSet, guard) // 处理当前的msgSet
msgSet = nil //重置
step = 0
}
} else {
if step > batchTime && len(msgSet) > 0 { // 超时并且不为空
handleId++
guard <- struct{}{}
go handleMsg(2, handleId, msgSet, guard)
msgSet = nil //重置
step = 0
} else {
step++
time.Sleep(1 * time.Second) //休息一秒 step++
}
}
}
// 挂起主进程 防止退出
for {
runtime.GC()
}
}
整体的代码就是这样,具体的可以看注释了(癖好这东西 the more the more)
运行效果
就这样,里面牵涉多个小点,比较有意思
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。