本文说明如何创建FC Sink Connector将数据从消息队列Kafka版实例的数据源Topic导出至函数计算的函数。
前提条件
- 为消息队列Kafka版实例开启Connector。更多信息,请参见开启Connector。
- 为消息队列Kafka版实例创建数据源Topic。更多信息,请参见步骤一:创建Topic。
本文以名称为fc-test-input的Topic为例。
- 在函数计算创建函数。更多信息,请参见使用控制台创建函数。
注意 函数类型必须为事件函数。
本文以服务名称为guide-hello_world、函数名称为hello_world、运行环境为Python的事件函数为例。该示例函数的代码如下:
# -*- coding: utf-8 -*- import logging # To enable the initializer feature # please implement the initializer function as below: # def initializer(context): # logger = logging.getLogger() # logger.info('initializing') def handler(event, context): logger = logging.getLogger() logger.info('hello world:' + bytes.decode(event)) return 'hello world:' + bytes.decode(event)
操作流程
使用FC Sink Connector将数据从消息队列Kafka版实例的数据源Topic导出至函数计算的函数的操作流程如下:
- 可选:为FC Sink Connector开启公网访问注意 如果您不需要使FC Sink Connector跨地域访问函数计算,您可以直接跳过该步骤。
- 可选:使FC Sink Connector跨账号访问函数计算
注意 如果您不需要使FC Sink Connector跨账号访问函数计算,您可以直接跳过该步骤。
- 可选:创建FC Sink Connector依赖的Topic和Consumer Group
注意
- 如果您不需要自定义Topic和Consumer Group的名称,您可以直接跳过该步骤。
- 部分FC Sink Connector依赖的Topic的存储引擎必须为Local存储,大版本为0.10.2的消息队列Kafka版实例不支持手动创建Local存储的Topic,只支持自动创建。
- 创建并部署FC Sink Connector
- 结果验证
为FC Sink Connector开启公网访问
如需使FC Sink Connector跨地域访问其他阿里云服务,您需要为FC Sink Connector开启公网访问。具体操作,请参见为Connector开启公网访问。
创建自定义权限策略
在目标账号下创建访问函数计算的自定义权限策略。
- 登录访问控制控制台。
- 在左侧导航栏,选择 。
- 在权限策略管理页面,单击创建权限策略。
- 在新建自定义权限策略页面,创建自定义权限策略。
创建RAM角色
在目标账号下创建RAM角色。由于RAM角色不支持直接选择消息队列Kafka版作为受信服务,您在创建RAM角色时,需要选择任意支持的服务作为受信服务。RAM角色创建后,手工修改信任策略。
添加权限
在目标账号下为创建的RAM角色授予访问函数计算的权限。
- 在左侧导航栏,单击RAM角色管理。
- 在RAM角色管理页面,找到AliyunKafkaConnectorRole,在其右侧操作列,单击添加权限。
- 在添加权限面板,添加KafkaConnectorFcAccess权限。
- 在选择权限区域,选择自定义策略。
- 在权限策略名称列表,找到KafkaConnectorFcAccess,单击KafkaConnectorFcAccess。
- 单击确定。
- 单击完成。
创建FC Sink Connector依赖的Topic
您可以在消息队列Kafka版控制台手动创建FC Sink Connector依赖的5个Topic。
创建FC Sink Connector依赖的Consumer Group
您可以在消息队列Kafka版控制台手动创建FC Sink Connector依赖的2个Consumer Group。
创建并部署FC Sink Connector
创建并部署用于将数据从消息队列Kafka版同步至函数计算的FC Sink Connector。
发送测试消息
部署FC Sink Connector后,您可以向消息队列Kafka版的数据源Topic发送消息,测试数据能否被同步至函数计算。
- 在Connector(公测组件)页面,找到目标Connector,在其右侧操作列,单击测试。
- 在Topic管理页面,选择实例,找到fc-test-input,在其右侧操作列,单击发送消息。
- 在发送消息面板,发送测试消息。
- 在分区文本框,输入0。
- 在Message Key文本框,输入1。
- 在Message Value文本框,输入1。
- 单击发送。
查看函数日志
向消息队列Kafka版的数据源Topic发送消息后,查看函数日志,验证是否收到消息。更多信息,请参见配置并查看函数日志。
日志中显示发送的测试消息。