最近工作室的一个业务跟另一个业务合并 自然要用到MQ(消息队列Message Queue)那么很显然 就要部署个RabbitMQ到服务器上了
我们用的是云托管的的服务 那自然是部署中间件到云服务上去了 服务是一路开通 结果到了需要调试的时候 怎么也连不上 (说是内网直连,但关键是 同事们都在线下做本地测试的呀)
直接无语了 面对这一场景 怎么办?业务还要继续 等着交货的 于是我想起了之前学过的技术栈
适用于简单的中小型项目 如果功能简单,访问量并不大可以考虑 如果你的应用程序对可靠性和高级功能有严格要求,并且需要处理大量的消息和复杂的消息路由,那么使用专门的消息队列系统可能更合适。
首先,确保你已经正确地配置了Redis和Lettuce依赖项,并创建了LettuceConnectionFactory对象。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
redis:
host:
port: 6379
password:
lettuce:
pool:
max-active: 1000
max-idle: 1000
min-idle: 0
time-between-eviction-runs: 10s
max-wait: 10000
创建一个RedisTemplate对象,并将LettuceConnectionFactory设置为其连接工厂:
@Bean
public RedisTemplate<String, String> redisTemplate(LettuceConnectionFactory connectionFactory) {
RedisTemplate<String, String> template = new RedisTemplate<>();
template.setConnectionFactory(connectionFactory);
template.setDefaultSerializer(new StringRedisSerializer());
return template;
}
redisTemplate.setDefaultSerializer(new StringRedisSerializer());
发布消息:
redisTemplate.convertAndSend("channel_name", "message_payload");
在上述代码中,"channel_name"是消息的通道名称,"message_payload"是要发布的消息内容。
订阅消息:
首先,创建一个MessageListener实现类来处理接收到的消息:
public class MessageListenerImpl implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) {
// 处理接收到的消息
String channel = new String(message.getChannel());
String payload = new String(message.getBody());
// 执行自定义的逻辑
}
}
创建一个LettuceMessageListenerAdapter对象,并提供MessageListener实现类:
LettuceMessageListenerAdapter listenerAdapter = new LettuceMessageListenerAdapter(new MessageListenerImpl());
listenerAdapter.afterPropertiesSet();
创建一个RedisMessageListenerContainer对象,并配置它的LettuceConnectionFactory和监听适配器:
RedisMessageListenerContainer listenerContainer = new RedisMessageListenerContainer();
listenerContainer.setConnectionFactory(lettuceConnectionFactory);
listenerContainer.addMessageListener(listenerAdapter, new ChannelTopic("通道名称"));
listenerContainer.start();
通过以上步骤,我们创建了一个LettuceConnectionFactory对象来与Redis服务器建立连接。然后,我们创建了一个MessageListener实现类来处理接收到的消息。接下来,我们创建了一个LettuceMessageListenerAdapter对象,并提供MessageListener实现类。最后,我们创建了一个RedisMessageListenerContainer对象,并配置它的LettuceConnectionFactory和监听适配器,然后启动容器以开始监听指定通道上的消息。
以上的方案 好处就是 可以很明显的知道监听者在哪个部分 监听对应通道的信息 然而 业务当中 如果每一个对应模块的业务和通道都建立一个监听者来进行监听(我们假设每一个就业务所要得到消息以后所执行的逻辑都不相同) 那这个工作量就会暴增
于是就有了第二种写法 :
/***
* @title MessageManager
* @author SUZE
* @Date 2-17
**/
@Component
public class ReservedMessageManager {
private String ListenerId;
private String UserId;
private String message;
private final RedisTemplate<String, String> redisTemplate;
@Autowired
public ReservedMessageManager(RedisTemplate<String, String> redisTemplate) {
this.redisTemplate = redisTemplate;
subscribeToChannel("reserved");
}
@Resource
private SmsServer smsServer;
public void publishMessage(String channel, reserveMessage message) {
String Message=serialize(message);
redisTemplate.convertAndSend("channel_name", "message_payload");
redisTemplate.convertAndSend(channel, Message);
}
// 接收到消息时触发的事件
private void handleReserveMessage(String channel, reserveMessage reserveMessage) {
if (reserveMessage != null) {
String userId = reserveMessage.getUserId();
String ListenerId=reserveMessage.getListenerId();
String message = reserveMessage.getMessage();
//TODO 处理接收到的消息逻辑 这里后续要对Message进行一个检测他有wait agree和refused和over四种状态 思种状态就是不一样的发送内容
switch (message){
//TODO 消息要给两边都发 所以要发两份 发信息的文案
case "wait":
smsServer.sendSms(userId,ListenerId,message);
break;
case "agree":
smsServer.sendSms(userId,ListenerId,message);
break;
case "refuse":
smsServer.sendSms(userId,ListenerId,message);
break;
case "over":
//这里要操作文档系统了
//拒绝的话 那就要监听一下
smsServer.sendSms(userId,ListenerId,message);
break;
}
//smsServer.sendSms(userId,ListenerId,message);
// 其他处理逻辑...
}
}
public void subscribeToChannel(String channel) {
redisTemplate.execute((RedisCallback<Object>) (connection) -> {
connection.subscribe((message, pattern) -> {
String channelName = new String(message.getChannel());
byte[] body = message.getBody();
// 解析接收到的消息
switch (channelName){
case "reserved":
reserveMessage reserveMessage = deserializeMessage(new String(body));
handleReserveMessage(channelName, reserveMessage);
break;
//还有其他的通道 例如refuse就是一个 拒绝通道 专门监听拒绝的理由
}
}, channel.getBytes());
return null;
});
}
// 反序列化
private reserveMessage deserializeMessage(String body) {
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.readValue(body, reserveMessage.class);
} catch (IOException e) {
// 处理反序列化异常
e.printStackTrace();
return null;
}
}
// 序列化
public String serialize(reserveMessage reserveMessage) throws SerializationException {
if (reserveMessage == null) {
return null;
}
try {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.writeValueAsString(reserveMessage);
} catch (JsonProcessingException e) {
throw new SerializationException("Error serializing object", e);
}
}
}
subscribeToChannel
方法接受一个channel
参数,用于指定要订阅的通道名称。redisTemplate.execute
方法用于执行Redis操作,并传入一个RedisCallback
回调函数。connection
参数,表示与Redis的连接。connection.subscribe
方法来订阅通道。该方法接受一个回调函数作为参数,用于处理接收到的消息。message
对象中获取通道名称和消息体。new String(message.getChannel())
将通道名称转换为字符串类型,并存储在channelName
变量中。message.getBody()
获取消息体的字节数组表示,并存储在body
变量中。switch
语句中,根据通道名称进行不同的处理。在这个例子中,仅处理"reserved"通道。deserializeMessage
方法将消息体反序列化为reserveMessage
对象,并将其存储在名为reserveMessage
的局部变量中。handleReserveMessage
方法,将通道名称和反序列化后的reserveMessage
对象作为参数进行处理。handleReserveMessage
方法用于处理接收到的保留消息的逻辑。它检查消息类型,并根据类型执行不同的操作。根据消息类型,它调用smsServer.sendSms
方法向指定的userId
和listenerId
发送短信。错误信息:com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `TopOne.MessageSystem.entity.reserveMessage` (no Creators, like default constructor, exist): cannot deserialize from Object value (no delegate- or property-based Creator)
reserveMessage类缺少默认构造函数,这导致Jackson库无法构造该类的实例。错误消息中提到了以下内容:"Cannot construct instance of TopOne.MessageSystem.entity.reserveMessage (no Creators, like default constructor, exist)"。 为了使Jackson能够正确地反序列化对象,需要在reserveMessage类中添加一个默认构造函数。默认构造函数是一个无参数的构造函数,它不需要任何参数来创建对象。 在你的reserveMessage类中
这个是改好的封装类:
@Data
public class reserveMessage {
private String UserId;
private String ListenerId;
private String message;
public reserveMessage() {
// 默认构造函数
}
public reserveMessage(String userId, String ListenerId,String message) {
this.UserId = userId;
this.ListenerId = ListenerId;
this.message=message;
}
}
成功
这里面的打印是代替了原本业务中的短信发送 也算是成了
这一期就到这 感谢观看