本文介绍如何通过事件总线EventBridge将消息队列RabbitMQ版的数据推送到函数计算。
注意事项
连接器支持的地域以消息队列RabbitMQ版开服的地域为准。
步骤一:创建连接器
步骤二:创建事件规则
- 登录事件总线EventBridge控制台。
- 在左侧导航栏,单击自定义总线。
- 在顶部菜单栏,选择地域。
- 在自定义总线页面,找到目标总线,在其右侧操作列单击规则管理。
- 在规则管理页面,单击创建规则。
- 在创建规则页面,完成以下操作。
步骤三:发布事件
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import java.util.UUID;
public class ProducerTest {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
// 设置接入点,在RabbitMQ版控制台实例详情页面查看。
factory.setHost("xxx.xxx.aliyuncs.com");
// 用户名,在RabbitMQ版控制台用户名密码管理页面查看。
factory.setUsername("${UserName}");
// 密码,在RabbitMQ版控制台用户名密码管理页面查看。
factory.setPassword("${PassWord}");
//一定要这个才能自动恢复。
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
// 设置Vhost名称,请确保已在RabbitMQ版控制台上创建完成。
factory.setVirtualHost("${VhostName}");
// 默认端口,非加密端口5672,加密端口5671。
factory.setPort(5672);
// 基于网络环境合理设置超时时间。
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 开始发送消息。
for (int i = 0; i < 100; i++ ) {
// ${ExchangeName}必须在RabbitMQ版控制台上已存在,并且Exchange的类型与控制台上的类型一致。
// BindingKey根据业务需求填入相应的BindingKey。
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
channel.basicPublish("${ExchangeName}", "BindingKey", true, props,
("消息发送Body" + i).getBytes(StandardCharsets.UTF_8));
}
connection.close();
}
}
结果验证
您可以在函数计算控制台使用表盘解读数据指标。