本文介绍如何通过事件总线EventBridge消息队列RabbitMQ版的数据推送到函数计算。

注意事项

连接器支持的地域以消息队列RabbitMQ版开服的地域为准。

步骤一:创建连接器

  1. 登录事件总线EventBridge控制台
  2. 在左侧导航栏,单击自定义总线
  3. 在顶部菜单栏,选择地域。
  4. 自定义总线页面,找到目标总线,在其右侧操作列单击连接器管理
  5. 连接器页面,单击创建连接器
  6. 单击消息队列RabbitMQ版,完成以下操作,然后单击创建并启动
    • 名称:输入连接器的名称。
    • RabbitMQ实例:选择已创建的实例。
    • Vhost:选择已创建的Vhost。
    • Queue:选择已创建的Queue。

步骤二:创建事件规则

注意 目标服务和事件规则必须处于同一地域。
  1. 登录事件总线EventBridge控制台
  2. 在左侧导航栏,单击自定义总线
  3. 在顶部菜单栏,选择地域。
  4. 自定义总线页面,找到目标总线,在其右侧操作列单击规则管理
  5. 规则管理页面,单击创建规则
  6. 创建规则页面,完成以下操作。
    1. 配置基本信息页面,在名称文本框输入规则名称,在描述文本框输入规则的描述,然后单击下一步
    2. 配置事件模式页面,事件模式类型选择自定义事件模式,在事件模式内容代码框输入事件模式,然后单击下一步

      如需了解更多信息,请参见事件模式

    3. 配置事件目标页面,配置事件目标,然后单击创建
      说明 1个事件规则最多可以添加5个目标。
      • 服务类型:单击函数计算
      • 服务:选择已创建的服务。
      • 函数:选择已创建的函数。
      • 事件:单击转换内容的形式。
        • 完整事件:不做转换,直接投递原生CloudEvents 1.0协议中的完整结构。
        • 部分事件:通过JSONPath提取出需要投递到事件目标的内容。
        • 常量:事件只起到触发器的作用,不管事件内容是什么,投递内容为常量。
        • 模板:通过自定义变量和模板,将事件按照模板的示例输出。

          以下提供变量和模板的示例。

          变量示例:

          {
            "source":"$.source",
            "type":"$.type"
          }

          模板示例:

          The event comes from ${source},event type is ${type}.

        如需了解更多信息,请参见事件内容转换

      • 服务版本和别名:选择服务版本或服务别名。
        • 默认版本:LATEST。
        • 指定版本:选择服务版本。更多信息,请参见版本简介
        • 指定别名:选择服务别名。更多信息,请参见别名简介

步骤三:发布事件

import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import java.util.UUID;

public class ProducerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        // 设置接入点,在RabbitMQ版控制台实例详情页面查看。
        factory.setHost("xxx.xxx.aliyuncs.com");
        // 用户名,在RabbitMQ版控制台用户名密码管理页面查看。
        factory.setUsername("${UserName}");
        // 密码,在RabbitMQ版控制台用户名密码管理页面查看。
        factory.setPassword("${PassWord}");
        //一定要这个才能自动恢复。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        // 设置Vhost名称,请确保已在RabbitMQ版控制台上创建完成。
        factory.setVirtualHost("${VhostName}");
        // 默认端口,非加密端口5672,加密端口5671。
        factory.setPort(5672);
        // 基于网络环境合理设置超时时间。
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 开始发送消息。
        for (int i = 0; i < 100; i++  ) {
            // ${ExchangeName}必须在RabbitMQ版控制台上已存在,并且Exchange的类型与控制台上的类型一致。
            // BindingKey根据业务需求填入相应的BindingKey。
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
            channel.basicPublish("${ExchangeName}", "BindingKey", true, props,
                    ("消息发送Body"  + i).getBytes(StandardCharsets.UTF_8));
        }
        connection.close();
    }
}

结果验证

您可以在函数计算控制台使用表盘解读数据指标。

  1. 登录函数计算控制台
  2. 在顶部菜单栏,选择地域。
  3. 在左侧导航栏,单击服务及函数
  4. 服务及函数页面,在服务列表中单击目标服务。
  5. 函数列表页签,找到目标函数,然后单击函数名称列下的目标函数名称。
  6. 单击日志查询页签,查看日志。
    FC Invoke Start RequestId: c2be67a7-fh1a-9619-ei4c-3c04gcf6****
    2020-11-19T11:11:34.161Z c2be67a7-fh1a-9619-ei4c-3c04gcf6c**** [verbose] Receive Event v2 ==> The event comes from aliyun.ui,event type is ui:Created:PostObject.
    2020-11-19T11:11:34.167Z c2be67a7-fh1a-9619-ei4c-3c04gcf6c**** 
    FC Invoke End RequestId: c2be67a7-fh1a-9619-ei4c-3c04gcf6c****