前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >rabbit实践:Golang生产者消费实例

rabbit实践:Golang生产者消费实例

原创
作者头像
yield9tk
发布2022-03-13 16:24:56
1.2K0
发布2022-03-13 16:24:56
举报

1. 概述

RabbitMQ是一款高性能的消息中间件。在实际开发中,Golang开发者都会使用https://github.com/streadway/amqp这个库,实现功能。但这个库仅仅只是对AMQP的实现。根据我们的业务场景,我总结出来一套生产者-消费者实践,它具有以下特点:

  • 保证断线重连
  • 生产者保证消息至少一次发送到队列中
  • 消费者将Ack交给执行业务函数
  • 消费者控制消费携程数量

2. Conn

Conn是抽象的一个连接对象,它将AMQP中的ConnectionChannel概念,整合到一起,并且提供了监听断线重连机制。

代码语言:go
复制
package rabbitx

import (
	"fmt"
	"strconv"
	"time"

	"github.com/streadway/amqp"
)

// ReconntTimes 重连次数
const ReconnectTimes = 100

// ReconntTimes 重连间隔,每次等比递增
const ReconnectInterval = 3

type Conn struct {
	conn *amqp.Connection
	ch   *amqp.Channel

	connStr     string
	isConnected bool
	recevier    chan *amqp.Error
}

// NewConn 新建连接
func NewConn(connStr string) (*Conn, error) {
	c := &Conn{connStr: connStr}

	// 启动时,无法连接则报错
	if err := c.connect(); err != nil {
		return nil, err
	}

	c.recevier = make(chan *amqp.Error)
	c.conn.NotifyClose(c.recevier)
	go c.listenConnClose()
	return c, nil
}

// connect 连接
func (c *Conn) connect() error {
	conn, err := amqp.Dial(c.connStr)
	if err != nil {
		return err
	}
	c.conn = conn
	ch, err := conn.Channel()
	if err != nil {
		return err
	}
	c.ch = ch
	c.isConnected = true
	return nil
}

// listenConnClose 监听连接重连
func (c *Conn) listenConnClose() {
	for e := range c.recevier {
		fmt.Println("rabbitmq disconnect, reason: " + e.Reason)
		// 判断连接是否关闭
		if !c.conn.IsClosed() {
			c.conn.Close()
		}

		// 重新连接
		cTag := ReconnectTimes
		for i := 0; i < cTag; i++ {
			fmt.Println("rabbitmq 第" + strconv.Itoa(i) + "次重连")
			if err := c.connect(); err == nil {
				cTag = -1
				break
			}

			time.Sleep(time.Duration(i+1) * ReconnectInterval * time.Second)
		}

	}
}

3. Producer

Producer是定义的生产者,最大的特点就是;只发送消息到Exchange中,又因为Exchange不保存消息,所以在消费者上线建立队列之前,需要将退回消息重发:

代码语言:go
复制
package rabbitx

import (
	"errors"
	"fmt"
	"time"

	"github.com/streadway/amqp"
)

type Producer struct {
	c          *Conn
	exchange   string
	routingKey string

	basicReturn chan amqp.Return
}

// NewProducer 创建一个生产者
func NewProducer(conn *Conn, exchange string, routingKey string, exType string) *Producer {
	p := &Producer{
		exchange:   exchange,
		routingKey: routingKey,
	}

	p.c = conn

	// 交换机定义,如无则创建
	if err := p.c.ch.ExchangeDeclare(p.exchange, exType, true, false, false, false, nil); err != nil {
		panic(err)
	}

	// 如果消息没有发送到队列
	p.basicReturn = make(chan amqp.Return)
	p.c.ch.NotifyReturn(p.basicReturn)
	go p.listenPubReturn()
	return p
}

// listenPubReturn 监听消息被退回
func (p *Producer) listenPubReturn() {
	for r := range p.basicReturn {
		fmt.Println("return msg: " + string(r.Body))

		p.Publish(r.Body)
		time.Sleep(3 * time.Second)
	}
}

// Pushlish 发出消息
func (p *Producer) Publish(msg []byte) error {
	if !p.c.isConnected {
		return errors.New("conn is disconnected")
	}
	return p.c.ch.Publish(p.exchange, p.routingKey, true, false, amqp.Publishing{
		Body: msg,
	})
}

mandatory属性选择开启,才会将发送不成功的消息退回

4. Consumer

Consumer是定义的消费者,其中使用ants作为携程池,控制消费消息的速率,防止高峰期挤爆。

代码语言:go
复制
package rabbitx

import (
	"fmt"

	"github.com/panjf2000/ants/v2"
	"github.com/streadway/amqp"
)

const AntPoolSize = 100 // 消费携程池最大数量

type Consumer struct {
	c *Conn

	queueName string
}

// NewConsumer 新建收费者
func NewConsumer(conn *Conn, exchange string, routingKey string, exType string, queueName string) *Consumer {
	cu := &Consumer{c: conn, queueName: queueName}

	// 交换机定义,如无则创建
	if err := cu.c.ch.ExchangeDeclare(exchange, exType, true, false, false, false, nil); err != nil {
		panic(err)
	}
	// 队列定义,如无则创建
	q, err := cu.c.ch.QueueDeclare(queueName, true, false, false, false, nil)
	if err != nil {
		panic(err)
	}

	// 绑定队列到交换机
	if err := cu.c.ch.QueueBind(q.Name, routingKey, exchange, false, nil); err != nil {
		panic(err)
	}

	return cu
}

// Consume 执行消费
func (cu *Consumer) Consume(dofunc func(d amqp.Delivery)) error {
	dChan, err := cu.c.ch.Consume(cu.queueName, "", false, false, false, false, nil)
	if err != nil {

		return err
	}

	// 携程池控制消费携程数目
	pool, err := ants.NewPoolWithFunc(AntPoolSize, func(i interface{}) {
		d := i.(amqp.Delivery)
		dofunc(d)
	})
	if err != nil {
		fmt.Printf("ants pool error:%s", err.Error())
		return err
	}
	defer pool.Release()

	for d := range dChan {
		if err := pool.Invoke(d); err != nil {
			fmt.Printf("ants pool error:%s", err.Error())
		}
	}

	return nil
}

5. 总结

这个实践的实例,满足大部分情况的使用;但并不通用,基本是需要设计结构上是通过Exchagne-RoutingKey-Queue这样的模式,才适合。当然,还需要改进的地方:

  • 生产者,发送消息失败;而自身崩溃了导致消息丢失
  • 消费者,消费消息自身崩溃导致消息重复消费,需要执行函数来过滤

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 概述
  • 2. Conn
  • 3. Producer
  • 4. Consumer
  • 5. 总结
相关产品与服务
消息队列 TDMQ
消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com