前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SpringBoot: RabbitMQ消息队列之同时消费多条消息

SpringBoot: RabbitMQ消息队列之同时消费多条消息

作者头像
Freedom123
发布2024-03-29 14:44:52
1500
发布2024-03-29 14:44:52
举报
文章被收录于专栏:DevOpsDevOps

一、RabbotMQ接口介绍

1. basicQos预取方法参数解析

basicQos(int prefetchCount) basicQos(int prefetchCount, boolean global) basicQos(int prefetchSize, int prefetchCount, boolean global)

参数:

  • prefetchSize:可接收消息的大小
  • prefetchCount:处理消息最大的数量。
  • global:是不是针对整个Connection的,因为一个Connection可以有多个Channel,如果是false则说明只是针对于这个Channel的
2. basicConsumer消费方法参数解析

basicConsumer(String queue, Consumer consumer) basicConsumer(String queue, boolean autoAck, Consumer consumer)

参数:

  • queue:监听的队列名称
  • autoAck:是否自动消费消息
  • consumer:使用的消费者类

二、非Spring项目集成-失败不重试,直接确认

Consumer.java 消费者类

代码语言:javascript
复制
package com.lmc.mq.nospring;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;

/**
 * @author lmc
 * @Description: TODO
 * @Create 2021-09-07 22:06
 * @version: 1.0
 */
public class Consumer {

    private final static String QUEUE_NAME = "lmc-test"; //队列名称

    public static void main(String[] args) {
        initModule();
    }

    public static void initModule() {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("xx.xx.xx.xx"); //设置rabbitmq-server的地址
        connectionFactory.setPort(5672);  //使用的端口号
        connectionFactory.setVirtualHost("/");  //使用的虚拟主机
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        //由连接工厂创建连接
        Connection connection = null;

        try {
            connection = connectionFactory.newConnection();
            //通过连接创建信道
            final Channel channel = connection.createChannel();
            channel.basicQos(0, 3, true);
            //创建消费者,指定要使用的channel。QueueingConsume类已经弃用,使用DefaultConsumer代替
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                //监听的queue中有消息进来时,会自动调用此方法来处理消息。但此方法默认是空的,需要重写
                @Override
                public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    MqMessageDispatcher.doDispatch(new String(body, "UTF-8"), channel, envelope);
                }
            };

            //监听指定的queue。会一直监听。
            //参数:要监听的queue、是否自动确认消息、使用的Consumer
            channel.basicConsume(QUEUE_NAME, false, consumer);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

    }

}

MqMessageDispatcher.java 多线程类:同时并发处理多个消息

代码语言:javascript
复制
package com.lmc.mq.nospring;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Envelope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author lmc
 * @Description: TODO
 * @Create 2021-09-07 22:45
 * @version: 1.0
 */
public class MqMessageDispatcher {

    public static Logger logger = LoggerFactory.getLogger(MqMessageDispatcher.class);

    public static ExecutorService msgHandleService = Executors.newFixedThreadPool(5);

    static {
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                msgHandleService.shutdown();
            }
        });

    }

    public static void doDispatch(String message, Channel channel, Envelope envelope) {
        msgHandleService.execute(new MessageHandleTask(message, channel, envelope));
    }

    private static class MessageHandleTask implements Runnable {

        String message;
        Channel channel;
        Envelope envelope;

        public MessageHandleTask(String message, Channel channel, Envelope envelope) {
            this.message = message;
            this.channel = channel;
            this.envelope = envelope;
        }

        @Override
        public void run() {
            long start = System.currentTimeMillis();
            logger.info("Received message: " + message);
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            try {
                // 手动确认消息,若自动确认则不需要写以下该行
                channel.basicAck(envelope.getDeliveryTag(), false);
            } catch (IOException e) {
                System.err.println("fail to confirm message:" + message);
            }
        }
    }


}

三、非Spring项目集成-失败重试5次,再直接确认

MqMessageDispatcher.java

代码语言:javascript
复制
package com.lmc.mq.nospring;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Envelope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author lmc
 * @Description: TODO
 * @Create 2021-09-07 22:45
 * @version: 1.0
 */
public class MqMessageDispatcher {

    public static final Logger logger = LoggerFactory.getLogger(MqMessageDispatcher.class);

    public static ExecutorService msgHandleService = Executors.newFixedThreadPool(5);

    public static Map<String, Integer> cacheMap = new HashMap(5);

    static {
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                msgHandleService.shutdown();
            }
        });
    }

    public static void doDispatch(String message, Channel channel, Envelope envelope) {
        msgHandleService.execute(new MessageHandleTask(message, channel, envelope));
    }

    private static class MessageHandleTask implements Runnable {

        String message;
        Channel channel;
        Envelope envelope;

        public MessageHandleTask(String message, Channel channel, Envelope envelope) {
            this.message = message;
            this.channel = channel;
            this.envelope = envelope;
        }

        @Override
        public void run() {

            int currentTimes = 0; // 当前重试次数
            boolean isSuccess = false; // 消息是否处理成功
            // 获取当前消息重试次数,(这种情况适合每条消息内容不一样,最好每条消息都有唯一标识)
            if (cacheMap.containsKey(message)) {
                currentTimes = cacheMap.get(message);
            }else {
                cacheMap.put(message, 0);
            }

            long start = System.currentTimeMillis();
            logger.info("Received message: " + message);
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            try {
                if (isSuccess) {
                    // 手动确认消息
                    logger.info("message[" + message + "] consumer success.(Ack)");
                    cacheMap.put(message, 0);
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }else {
                    if (currentTimes >= 5) {
                        // 手动确认消息,若自动确认则不需要写以下该行
                        logger.warn("message[" + message + "] consumer fail,have retry 5 times.(Ack)");
                        cacheMap.put(message, 0);
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }else {
                        // 处理失败,重试未5次,重新处理
                        cacheMap.put(message, ++currentTimes);
                        logger.warn("message[" + message + "] consumer fail,prepare to retry " + currentTimes + " times...(Nack)");
                        channel.basicNack(envelope.getDeliveryTag(), false, true);
                    }
                }

            } catch (IOException e) {
                System.err.println("fail to confirm message:" + message);
            }
        }
    }


}

四、SpringBoot集成

使用springboot同时处理多个消息,只需要在配置文件中,添加以下配置:

代码语言:javascript
复制
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual # 开启手动确认
        concurrency: 1 #消费者最小数量
        max-concurrency: 3 #消费之最大数量
        prefetch: 3 #在单个请求中处理的消息个数,他应该大于等于事务数量(unack的最大数量)

监听类 LmcTestConsumer:

代码语言:javascript
复制
package com.lmc.mq.spring.consumer;

import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author lmc
 * @Description: TODO
 * @Create 2021-09-18 19:32
 * @version: 1.0
 */
@Component
public class LmcTestConsumer {

    public static final Logger logger = LoggerFactory.getLogger(LmcTestConsumer.class);


    @RabbitHandler
    @RabbitListener(queues = "lmc-test")
    public void handler(@Payload Message message, Channel channel) {
        try {
            String msg = new String(message.getBody(), "UTF-8");
            MqMessageDispatcher.doDispatch(msg, channel, message.getMessageProperties().getDeliveryTag());
        } catch (IOException e) {
            logger.error(e.getMessage());
        } catch (NullPointerException e1) {
            logger.error(e1.getMessage());
        } catch (Exception e) {
            logger.error(e.getMessage());
        }
    }

}

其他

参考:https://gitee.com/lmchh/lmc-tools/tree/master/tools-message-queue

本文参与?腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2024-03-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客?前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与?腾讯云自媒体分享计划? ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、RabbotMQ接口介绍
    • 1. basicQos预取方法参数解析
      • 2. basicConsumer消费方法参数解析
      • 二、非Spring项目集成-失败不重试,直接确认
      • 三、非Spring项目集成-失败重试5次,再直接确认
      • 四、SpringBoot集成
      • 其他
      相关产品与服务
      消息队列
      腾讯云消息队列 TDMQ 是分布式架构中的重要组件,提供异步通信的基础能力,通过应用解耦降低系统复杂度,提升系统可用性和可扩展性。TDMQ 产品系列提供丰富的产品形态,包含 CKafka、RocketMQ、RabbitMQ、Pulsar、CMQ 五大产品,覆盖在线和离线场景,满足金融、互联网、教育、物流、能源等不同行业和场景的需求。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
      http://www.vxiaotou.com