首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

轻量级消息队列RedisQueue

消息队列(Message Queue)是分布式系统必不可少的中间件,大部分消息队列产品(如RocketMQ/RabbitMQ/Kafka等)要求团队有比较强的技术实力,不适用于中小团队,并且对.NET技术的支持力度不够。而Redis实现的轻量级消息队列很简单,仅有Redis常规操作,几乎不需要开发团队掌握额外的知识!

随着强大的.NET5发布,.NET技术栈里面怎可没有最佳的消息队列搭档?

本文从高性能Redis组件NewLife.Redis出发,借用快递业务场景,讲解.NET中如何使用Redis作为消息队列,搭建企业级分布式系统架构!

什么是消息队列

消息队列就是消息在传输过程中保存消息的容器,其核心功用是削峰解耦

早高峰,快递公司的货车前来各驿站卸货,多名站点工作人员使用PDA扫描到站,大量信息进入系统(1000tps),而通知快递公司的接口只有400tps的处理能力。

通过增加MQ来保存消息,让超过系统处理能力的消息滞留下来,等早高峰过后,系统即可完成处理。此为削峰

在快递柜业务流程中,快递员投柜后需要经历扣减系统费、短信通知用户和推送通知快递公司三个业务动作。传统做法需要依次执行这些业务东西,如果其中某一步异常(例如用户手机未开机或者快递公司接口故障),将会延迟甚至中断整个投柜流程,严重影响用户体验。

如果接口层收到投柜数据后,写入消息到MQ,后续三个子系统各自消费处理,将可以完美解决该问题,并且子系统故障不影响上游系统!此为解耦

内存消息队列

最简单的消息队列,可以由阻塞集合BlockingCollection实现

public static void Start(){ ? ?var queue = new BlockingCollection(); ? ?// 独立线程消费 ? ?var thread = new Thread(s => Consume(queue)); ? ?thread.Start(); ? ?// 发布消息 ? ?Public(queue);}private static void Public(BlockingCollection queue){ ? ?var area = new Area { Code = 110000, Name = "北京市" }; ? ?XTrace.WriteLine("Public ", area.Code, area.Name); ? ?queue.Add(area); ? ?Thread.Sleep(1000); ? ?area = new Area { Code = 310000, Name = "上海市" }; ? ?XTrace.WriteLine("Public ", area.Code, area.Name); ? ?queue.Add(area); ? ?Thread.Sleep(1000); ? ?area = new Area { Code = 440100, Name = "广州市" }; ? ?XTrace.WriteLine("Public ", area.Code, area.Name); ? ?queue.Add(area); ? ?Thread.Sleep(1000);}private static void Consume(BlockingCollection queue){ ? ?while (true) ? ?{ ? ? ? ?var msg = queue.Take(); ? ? ? ?if (msg != null) ? ? ? ?{ ? ? ? ? ? ?XTrace.WriteLine("Consume ", msg.Code, msg.Name); ? ? ? ?} ? ?}}

每秒钟生产一个消息,都被独立线程消费到。

Redis做消息队列

Redis的LIST结构,具备左进右出的功能,再使用BRPOP的阻塞弹出,即可完成一个最基本的消息队列 RedisQueue。

GetQueue取得队列后,Add方法发布消息。

TakeOne拉取消费一条消息,指定10秒阻塞,10秒内有消息立马返回,否则等到10秒超时后返回空。

public static void Start(FullRedis redis){ ? ?var topic = "EasyQueue"; ? ?// 独立线程消费 ? ?var thread = new Thread(s => Consume(redis, topic)); ? ?thread.Start(); ? ?// 发布消息 ? ?Public(redis, topic);}private static void Public(FullRedis redis, String topic){ ? ?var queue = redis.GetQueue(topic); ? ?queue.Add(new Area { Code = 110000, Name = "北京市" }); ? ?Thread.Sleep(1000); ? ?queue.Add(new Area { Code = 310000, Name = "上海市" }); ? ?Thread.Sleep(1000); ? ?queue.Add(new Area { Code = 440100, Name = "广州市" }); ? ?Thread.Sleep(1000);}private static void Consume(FullRedis redis, String topic){ ? ?var queue = redis.GetQueue(topic); ? ?while (true) ? ?{ ? ? ? ?var msg = queue.TakeOne(10); ? ? ? ?if (msg != null) ? ? ? ?{ ? ? ? ? ? ?XTrace.WriteLine("Consume ", msg.Code, msg.Name); ? ? ? ?} ? ?}}

LPUSH 生产消息(插入列表),BRPOP 消费消息(弹出列表),因此,消息被消费后就消失了!

从日志时间可以看到,生产与消费的时间差在1~3ms之间,延迟极低!

注释消费代码后重跑,可以在Redis中看到发布的消息

需要确认的队列

如果通知快递公司的物流推送子系统处理消息时出错,消息丢失怎么办?显然不可能让上游再发一次!

这里我们需要支持消费确认的可信队列 RedisReliableQueue。消费之后,除非程序主动确认消费,否则Redis不许删除消息。

GetReliableQueue获取队列实例后,Add发布消息,TakeOneAsync异步消费一条消息,并指定10秒阻塞超时,处理完成后再通过Acknowledge确认。

public static void Start(FullRedis redis){ ? ?var topic = "AckQueue"; ? ?// 独立线程消费 ? ?var source = new CancellationTokenSource(); ? ?Task.Run(() => ConsumeAsync(redis, topic, source.Token)); ? ?// 发布消息 ? ?Public(redis, topic); ? ?source.Cancel();}private static void Public(FullRedis redis, String topic){ ? ?var queue = redis.GetReliableQueue(topic); ? ?queue.Add(new Area { Code = 110000, Name = "北京市" }); ? ?Thread.Sleep(1000); ? ?queue.Add(new Area { Code = 310000, Name = "上海市" }); ? ?Thread.Sleep(1000); ? ?queue.Add(new Area { Code = 440100, Name = "广州市" }); ? ?Thread.Sleep(1000);}private static async Task ConsumeAsync(FullRedis redis, String topic, CancellationToken token){ ? ?var queue = redis.GetReliableQueue(topic); ? ?while (!token.IsCancellationRequested) ? ?{ ? ? ? ?var mqMsg = await queue.TakeOneAsync(10); ? ? ? ?if (mqMsg != null) ? ? ? ?{ ? ? ? ? ? ?var msg = mqMsg.ToJsonEntity(); ? ? ? ? ? ?XTrace.WriteLine("Consume ", msg.Code, msg.Name); ? ? ? ? ? ?queue.Acknowledge(mqMsg); ? ? ? ?} ? ?}}

LPUSH 生产消息(插入列表),BRPOPLPUSH 消费消息(弹出列表并插入另一个Ack列表),这是确保不丢消息的关键。 LREM 从Ack列表删除,用于消费完成后确认。

如果消费异常,就不会执行该确认操作,滞留在Ack列表的消息,60秒后重新回来主列表。

脑筋急转弯: 如果应用进程异常退出,未确认的消息该怎么处理?

注释消费代码后重跑,可以在Redis中看到发布的消息,跟普通队列一样,使用了LIST结构

处理“北京市”消息时,如果没有Acknowledge确认,Redis里面将会看到一个名为AckQueue:Ack:*的LIST结构,里面保存这这一条消息。所以,可信队列本质上就是在消费时,同步把消息备份到另一个LIST里面,确认操作就是从待确认LIST里面删除。

自从有了这个可信队列,基本上足够满足90%以上业务需求。

延迟队列

某一天,小马哥说,快递员投柜一定时间时候,如果用户没有来取件,那么系统需要收取超期取件费,需要一个延迟队列。

于是想到了Redis的ZSET,我们再来一个 RedisDelayQueue,Add生产消息时多了一个参数,指定若干秒后可以消费到该消息,消费用法跟可信队列一样。

public static void Start(FullRedis redis){ ? ?var topic = "DelayQueue"; ? ?// 独立线程消费 ? ?var source = new CancellationTokenSource(); ? ?Task.Run(() => ConsumeAsync(redis, topic, source.Token)); ? ?// 发布消息 ? ?Public(redis, topic); ? ?source.Cancel();}private static void Public(FullRedis redis, String topic){ ? ?var queue = redis.GetDelayQueue(topic); ? ?queue.Add(new Area { Code = 110000, Name = "北京市" }, 2); ? ?Thread.Sleep(1000); ? ?queue.Add(new Area { Code = 310000, Name = "上海市" }, 2); ? ?Thread.Sleep(1000); ? ?queue.Add(new Area { Code = 440100, Name = "广州市" }, 2); ? ?Thread.Sleep(1000);}private static async Task ConsumeAsync(FullRedis redis, String topic, CancellationToken token){ ? ?var queue = redis.GetDelayQueue(topic); ? ?while (!token.IsCancellationRequested) ? ?{ ? ? ? ?var mqMsg = await queue.TakeOneAsync(10); ? ? ? ?if (mqMsg != null) ? ? ? ?{ ? ? ? ? ? ?var msg = mqMsg.ToJsonEntity(); ? ? ? ? ? ?XTrace.WriteLine("Consume ", msg.Code, msg.Name); ? ? ? ? ? ?queue.Acknowledge(mqMsg); ? ? ? ?} ? ?}}

上图可以看到,每秒生产一个消息,2秒后消费到北京市,再过1秒消费到上海市(距离上海市的发布刚好2秒)。这里少了广州市,因为测试程序在生产广州市后,只等了1秒就退出。

我们从Redis中可以看到广州市这一条消息,存放在ZSET结构中。

多消费组可重复消费的队列

又一天,数据中台的小伙伴想要消费订单队列,但是不能够啊,LIST结构做的队列,每个消息只能被消费一次,如果数据中台的系统消费掉了,其它业务系统就会失去消息。

我们想到了Redis5.0开始新增的STREAM结构,再次封装RedisStream。

public static void Start(FullRedis redis){ ? ?var topic = "FullQueue"; ? ?var queue = redis.GetStream(topic); ? ?// 独立线程消费 ? ?var source = new CancellationTokenSource(); ? ?Task.Run(() => ConsumeAsync(redis, topic, source.Token)); ? ?// 发布消息 ? ?Public(redis, topic); ? ?//source.Cancel();}private static void Public(FullRedis redis, String topic){ ? ?var queue = redis.GetStream(topic); ? ?queue.Add(new Area { Code = 110000, Name = "北京市" }); ? ?Thread.Sleep(1000); ? ?queue.Add(new Area { Code = 310000, Name = "上海市" }); ? ?Thread.Sleep(1000); ? ?queue.Add(new Area { Code = 440100, Name = "广州市" }); ? ?Thread.Sleep(1000);}private static async Task ConsumeAsync(FullRedis redis, String topic, CancellationToken token){ ? ?var queue = redis.GetStream(topic); ? ?queue.Group = "test"; ? ?queue.GroupCreate(queue.Group); ? ?while (!token.IsCancellationRequested) ? ?{ ? ? ? ?try ? ? ? ?{ ? ? ? ? ? ?var mqMsg = await queue.TakeMessageAsync(10); ? ? ? ? ? ?if (mqMsg != null) ? ? ? ? ? ?{ ? ? ? ? ? ? ? ?var msg = mqMsg.GetBody(); ? ? ? ? ? ? ? ?XTrace.WriteLine("Consume ", msg.Code, msg.Name); ? ? ? ? ? ? ? ?queue.Acknowledge(mqMsg.Id); ? ? ? ? ? ?} ? ? ? ?} ? ? ? ?catch (Exception ex) ? ? ? ?{ ? ? ? ? ? ?XTrace.WriteException(ex); ? ? ? ?} ? ?}}

生产过程不变,消费大循环有点特别,主要是STREAM消费回来的消息,有它自己的Id,只需要对这个Id确认就可以了。

上图中,红色框是生产,紫色框是消费。

再来看看Redis中,可以看到STREAM消息还在里面。数据中台组只需要使用不同的消费组Group,即可独立消费,不用担心抢其它系统消息啦。

最佳实践

RedisQueue在中通大数据分析中,用于缓冲等待写入Oracle/MySql的数据,多线程计算后写入队列,然后由专门线程定时拉取一批(500行),执行批量Insert/Update操作。该系统队列,每天10亿条消息,Redis内存分配8G,实际使用小于100M,除非消费端故障导致产生积压。

递易智能科技全部使用可信队列 RedisReliableQueue,约200多个队列,按系统分布在各自的Redis实例,公有云2G内存主从版。积压消息小于10万时,队列专用的Redis实例内存占用小于100M,几乎不占内存空间。

公司业务每天带来100万多订单,由此衍生的消息数约1000万条,从未丢失消息!

例程代码

代码:https://github.com/NewLifeX/NewLife.Redis/tree/master/QueueDemo

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20201224A06JOM00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券
http://www.vxiaotou.com