实时即未来,最近在腾讯云Oceanus进行Flink实时计算服务,以下为flink消费腾讯云CMQ数据实践。原文自Raigor,已获得授权,分享给大家~
Oceanus Flink CMQ connector 支持队列模型的数据源表和目的表,暂时不支持主题模型数据源表和目的表。CMQ 主题订阅可以实时同步主题模型数据到队列模型,借助这种机制,我们可以在 Oceanus 实现 CMQ 主题模型数据源表的读取。
在 Oceanus 控制台的【集群管理】->【新建集群】页面创建集群,选择地域、可用区、VPC、日志、存储,设置初始密码等。
若之前未使用过VPC,日志,存储这些组件,需要先进行创建。
创建完后的集群如下:
在 CMQ 控制台的【主题订阅】-> 【新建】主题,输入主题名,其他保持默认值即可。新建的主题如下:
CMQ 主题
在 CMQ 控制台的【队列】-> 【新建】主题,输入队列名称、消息生命周期、堆积消息数量上限,其他保持默认值即可。我们这里新建两个队列,其中一个用来订阅 CMQ 主题模型数据,另一个用作 Oceanus 作业的目的表。新建的主题如下:
在 CMQ 主题列表页,点击主题操作列的【订阅】链接,进入【订阅者】列表,新建订阅,输入订阅名,终端类型选择 Queue 队列服务,订阅地址选择cs2的队列,其他保持默认值。新建的订阅者如下:
在 Oceanus 控制台【作业管理】->【新建作业】-> SQL作业,选择刚刚新建的集群创建作业。然后在作业的【开发调试】->【作业参数】里面添加必要的connector cmq-1.1.1。
2.2 创建数据源表和目的表
在作业的【开发调试】->【插入模板】选择 CMQ 读取 & 写入的模板,并添加。修改参数queue、secret-id、secret-key。
注意:强烈建议使用具有最小权限的secret-id和secret-key,并注意保密,防止泄漏带来的安全风险。
CMQ 读取 & 写入
CREATE TABLE `CMQSourceTable` ( `id` bigint, `request_method` varchar(80), `response` varchar(80), PRIMARY KEY (`id`) NOT ENFORCED --如果想做到数据去重的操作,则需要指定PK,按照这个主键来区分不同的数据 ) WITH ( 'connector' = 'cmq', --必须为 'cmq' 'hosts' = 'http://cmq-nameserver-vpc-gz.api.tencentyun.com', --cmq所在地域的nameServer 'queue' = 'cs2', --cmq的队列名 'secret-id' = 'Your SecretId', --账号secretId 'secret-key' = 'Your SecretKey', --账号secretKey 'sign-method' = 'HmacSHA1', --签名的方式 'format' = 'csv', --定义数据格式(JSON 格式) 'batch-size' = '16', --批量消费消息的个数/批量发送消息的个数 'request-timeout' = '5000ms', --请求的超时时间 'polling-wait-timeout'= '10s', --source参数; 获取不到数据情况下的等待时间 'key-alive-timeout'= '5min', --source参数;含primary key的消息,CMQ去重的有效时间 'retry-times' = '3', --sink参数;发送消息的重试次数 'max-block-timeout' = '0s' --sink参数;批量发送数据的最大等待时间 ); CREATE TABLE `CMQSinkTable` ( `id` bigint, `request_method` varchar(80), `response` varchar(80), PRIMARY KEY (`id`) NOT ENFORCED --如果想做到数据去重的操作,则需要指定PK,按照这个主键来区分不同的数据 ) WITH ( 'connector' = 'cmq', --必须为 'cmq' 'hosts' = 'http://cmq-nameserver-vpc-gz.api.tencentyun.com', --cmq所在地域的nameServer 'queue' = 'sink_queue', --cmq的队列名 'secret-id' = 'Your SecretId', --账号secretId 'secret-key' = 'Your SecretKey', --账号secretKey 'sign-method' = 'HmacSHA1', --签名的方式 'format' = 'csv', --定义数据格式(JSON 格式) 'batch-size' = '16', --批量消费消息的个数/批量发送消息的个数 'request-timeout' = '5000ms', --请求的超时时间 'polling-wait-timeout'= '10s', --source参数; 获取不到数据情况下的等待时间 'key-alive-timeout'= '5min', --source参数;含primary key的消息,CMQ去重的有效时间 'retry-times' = '3', --sink参数;发送消息的重试次数 'max-block-timeout' = '0s' --sink参数;批量发送数据的最大等待时间 ); insert into CMQSinkTable select * from CMQSourceTable;
这里只做最简单的数据插入。
insert into CMQSinkTable select *from CMQSourceTable;
在 CMQ 控制台往名为test的主题中发送消息,可在sink_queue的队列中接收到消息。
原文链接:https://cloud.tencent.com/developer/article/1857665
在TOP云(zuntop.com)科技租赁过服务器的站长都知道独立服务器在价格上比VPS主...
中国最?好的一朵云飘进了华瑞银行。阿里云将进一步助力华瑞银行All in Cloud。 -...
9月17日,2020云栖大会上,阿里云正式发布工业大脑3.0。 阿里云智能资深产品专家...
2020年对于云计算行业来说是突破性的一年,因为公共云供应商增加了收入,而疫情...
本文转载自网络,原文链接:https://mp.weixin.qq.com/s/vlOUg46B5bcmToX-fjavJQ...
最近,DevOps的采用导致了企业计算的重大转变。除无服务器计算,动态配置和即付...
很长时间没有更新原创文章了,但是还一直在思考和沉淀当中,后面公众号会更频繁...
一、PostgreSQL行业位置 一 行业位置 首先我们看一看RDS PostgreSQL在整个行业当...
查看表结构,sbtest1有主键、k_1二级索引、i_c二级索引 CREATE TABLE `sbtest1` ...
定义 this是函数运行时自动生成的内部对象,即调用函数的那个对象。(不一定很准...