前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Go协程加管道实现异步批量消费调度任务

Go协程加管道实现异步批量消费调度任务

原创
作者头像
pooky
修改2021-03-28 15:59:33
2.7K0
修改2021-03-28 15:59:33
举报
文章被收录于专栏:开发随笔开发随笔

周末了,这周遇到个问题当时没想明白,周末整理下

题目有点绕口 在现实的项目中这么搞的也不常见,里面牵涉多个知识点,整理下就当学习了。

程序需求:

1:接收任务,从异步消息队列里面监控接收最新的任务消息

2:处理任务,每个任务耗时可能不定

我们常规的做法就是开启一个长监听串行化来一个执行一个,实在不行就多开几个,这种呢人工干预比较重,有时候还盯着不是太好。

程序方案:

1:异步接受消息 ,开启一个协程接受消息 这个不用多开接收消息不会成为瓶颈

2:异步处理消息,开启协程异步处理对应的消息,这里有点要注意是一个消息就开一个处理协程 还是多个消息开启,是值得思考的。

3:批量处理,多个消息开启一个处理协程,防止开启过多的协程,

4:超时处理,如果长时间没有达到批量处理的数量限制,那么也要及时处理

5:限制过多的协程,这个其实没有什么必要,因为go所谓百万协程性能但是既然搞了这个例子 那就完善下吧

代码实现:

代码语言:javascript
复制
package main

import (
	"errors"
	"fmt"
	"math/rand"
	"runtime"
	"strconv"
	"time"
)

var msgChanLimit = 20 //消息管道大小限制
var handleId int      //协程任务自定义id 用来区分 查看对应的任务批次 每处理一次 ++
/*
periodType 消息产生时间
	1 1秒一个信息会走到超时处理模块;
	2 1毫秒一个信息 会一直走正常处理模块;
	3 随机0秒到 1秒 模拟都会触发的情况
*/
var periodType = 3

var batchNum = 6  // 最多堆积多少未处理信息 进行一批次处理
var batchTime = 3 // 在未满消息 最长多久 进行一批次处理

var maxGoroutines = 10 //最大的协程数量 这个其实感觉意义不大  但是还是搞一个  边界预防吧

//接受消息
func getMsg(msgChan chan string) {
	msgId := 0
	for { //循环接收消息
		msgId++
		msg := "Msg id:" + strconv.Itoa(msgId) + "; AddTime:" + time.Now().Format("2006-01-02 15:04:05")
		msgChan <- msg
		switch periodType {
		case 1:
			time.Sleep(1 * time.Second) // 秒
		case 2:
			time.Sleep(1 * time.Millisecond) // 毫秒
		case 3:
			time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond) // 随机
		}
	}
}

// 处理消息
func handleMsg(model int, handleId int, msgSet []string, guard chan struct{}) {
	for i, v := range msgSet {
		fmt.Println("HandleId:", handleId, ";Model", model, ">>> Idx:"+strconv.Itoa(i)+";Content:"+v)
		time.Sleep(1500 * time.Millisecond) //模拟具体处理消息是有耗时的 间隔越大 那么多个后台会有越多的挂起的处理
	}
	<-guard //释放一个限量

}

//无阻塞去接受消息
func unBlockRead(ch chan string) (msg string, err error) {
	select {
	case msg = <-ch:
		return msg, nil
	case <-time.After(time.Microsecond):
		return "", errors.New("nil")
	}
}

func main() {
	guard := make(chan struct{}, maxGoroutines) //守护协程数量限制
	msgChan := make(chan string, msgChanLimit)  //接受消息channel大小
	go getMsg(msgChan)                          // 开始接收消息
	msgSet := make([]string, 0)                 // 临时存放接收到的消息集合
	step := 0                                   //秒计数器 对应秒

	for { //主逻辑处理 开始处理
		if msg, err := unBlockRead(msgChan); err == nil { //接收到消息
			msgSet = append(msgSet, msg)
			if len(msgSet) == batchNum { //达到处理数量
				handleId++
				guard <- struct{}{}
				go handleMsg(1, handleId, msgSet, guard) // 处理当前的msgSet
				msgSet = nil                             //重置
				step = 0
			}
		} else {
			if step > batchTime && len(msgSet) > 0 { // 超时并且不为空
				handleId++
				guard <- struct{}{}
				go handleMsg(2, handleId, msgSet, guard)
				msgSet = nil //重置
				step = 0
			} else {
				step++
				time.Sleep(1 * time.Second) //休息一秒 step++
			}
		}
	}

	// 挂起主进程 防止退出
	for {
		runtime.GC()
	}
}

整体的代码就是这样,具体的可以看注释了(癖好这东西 the more the more)

运行效果

就这样,里面牵涉多个小点,比较有意思

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com