当前位置:主页 > 查看内容

通过开源Flink读写云原生数据仓库AnalyticDB PostgreSQL

发布时间:2021-04-27 00:00| 位朋友查看

简介:ADB PG版基于Flink 自定义conenctor支持读取 维表 和写入 结果表 。通过Flink SQL即可实现对ADB PG版的访问。 前提条件版本要求 Flink 1.11及以上版本 ADBPG 6.0版本 网络要求 ADBPG实例与Flink实例在同一VPC下 ADBPG设置白名单 开放对Flink实例的网络访问。……

ADB PG版基于Flink 自定义conenctor支持读取 维表 和写入 结果表 。通过Flink SQL即可实现对ADB PG版的访问。

前提条件版本要求

Flink 1.11及以上版本

ADBPG 6.0版本

网络要求

ADBPG实例与Flink实例在同一VPC下

ADBPG设置白名单 开放对Flink实例的网络访问。


操作步骤设置ADBPG实例

1、购买6.0版本ADBPG实例 创建账号 并设置白名单

image

2、连接数据库 创建待写入目标表、和待查询源数据表

create table test6(a int,b text,c text,d int,e int, f int, g bigint, h float, i double precision, j boolean);

insert into test6 values(0 , b0 , c0 , 40, ?50, ? ?60, ? ?70, ? ? ? 80.1, ? ?90.1, ? ? true

insert into test6 values(0 , b0 , c0 , 40, ?50, ? ?60, ? ?70, ? ? ? 80.2, ? ?90.2, ? ? false

insert into test6 values(1 , b1 , c1 , 41, ?50, ? ?60, ? ?70, ? ? ? 80.1, ? ?90.1, ? ? true

insert into test6 values(1 , b1 , c1 , 41, ?50, ? ?60, ? ?70, ? ? ? 80.2, ? ?90.2, ? ? false

insert into test6 values(2 , b2 , c2 , 40, ?50, ? ?60, ? ?70, ? ? ? 80.1, ? ?90.1, ? ? true

insert into test6 values(2 , b2 , c2 , 40, ?50, ? ?60, ? ?70, ? ? ? 80.2, ? ?90.2, ? ? false

insert into test6 values(3 , b3 , c3 , 40, ?50, ? ?60, ? ?70, ? ? ? 80.1, ? ?90.1, ? ? true

insert into test6 values(3 , b3 , c3 , 40, ?50, ? ?60, ? ?70, ? ? ? 80.2, ? ?90.2, ? ? false

insert into test6 values(4 , b4 , c4 , 40, ?50, ? ?60, ? ?70, ? ? ? 80.1, ? ?90.1, ? ? true

insert into test6 values(4 , b4 , c4 , 40, ?50, ? ?60, ? ?70, ? ? ? 80.2, ? ?90.2, ? ? false

insert into test6 values(5 , b5 , c5 , 40, ?50, ? ?60, ? ?70, ? ? ? 80.1, ? ?90.1, ? ? true

insert into test6 values(5 , b5 , c5 , 40, ?50, ? ?60, ? ?70, ? ? ? 80.2, ? ?90.2, ? ? false

insert into test6 values(6 , b6 , c6 , 40, ?50, ? ?60, ? ?70, ? ? ? 80.1, ? ?90.1, ? ? true

insert into test6 values(6 , b6 , c6 , 40, ?50, ? ?60, ? ?70, ? ? ? 80.2, ? ?90.2, ? ? false

insert into test6 values(7 , b7 , c7 , 40, ?50, ? ?60, ? ?70, ? ? ? 80.1, ? ?90.1, ? ? true

insert into test6 values(7 , b7 , c7 , 40, ?50, ? ?60, ? ?70, ? ? ? 80.2, ? ?90.2, ? ? false

insert into test6 values(8 , b8 , c8 , 40, ?50, ? ?60, ? ?70, ? ? ? 80.1, ? ?90.1, ? ? true

insert into test6 values(8 , b8 , c8 , 40, ?50, ? ?60, ? ?70, ? ? ? 80.2, ? ?90.2, ? ? false

insert into test6 values(9 , b9 , c9 , 40, ?50, ? ?60, ? ?70, ? ? ? 80.1, ? ?90.1, ? ? true

insert into test6 values(9 , b9 , c9 , 40, ?50, ? ?60, ? ?70, ? ? ? 80.2, ? ?90.2, ? ? false

create table test7(a int,b text,c text,d int,e int, f int, g bigint, h float, i double precision, j boolean, k int,l text,m text,n int,o int, p int, q bigint, r float, s double precision, t boolean);


Flink作业开发

1、创建Flink vvp版实例 要保证Flink实例与ADBPG实例处于同一个VPC下

2、创建SQL作业

image

3、作业开发

代码参考

CREATE TEMPORARY TABLE datagen_source2(

? a INT,

? b VARCHAR,

? c CHAR(15),

? d TINYINT,

? e SMALLINT,

? f INT,

? g BIGINT,

? h FLOAT,

? i DOUBLE,

? j BOOLEAN,

? proctime AS PROCTIME()

) with (

? connector datagen

);


CREATE TEMPORARY TABLE adbpg_dim2 (

? a INT,

? b VARCHAR,

? c CHAR(15),

? d TINYINT,

? e SMALLINT,

? f INT,

? g BIGINT,

? h FLOAT,

? i DOUBLE,

? j BOOLEAN

) with (

? ? connector adbpg ,

? ? password password ,

? ? tablename tablename ,

? ? username username ,

? ? url jdbc:postgresql://url:port/databasename ,

? ? maxretrytimes 2 ,

? ? connectionmaxactive 5 ,

? ? targetschema public ,

? ? casesensitive 0 ,

? ? retrywaittime 200 ,

? ? cache lru ,

? ? cacheSize 1000000 ,

? ? cacheTTLMs 2000000000


CREATE TEMPORARY TABLE adbpg_sink2(

? a INT,

? b VARCHAR,

? c CHAR(15),

? d TINYINT,

? e SMALLINT,

? f INT,

? g BIGINT,

? h FLOAT,

? i DOUBLE,

? j BOOLEAN,

? k INT,

? l VARCHAR,

? m CHAR(15),

? n TINYINT,

? o SMALLINT,

? p INT,

? q BIGINT,

? r FLOAT,

? s DOUBLE,

? t BOOLEAN

) with (

? ? connector adbpg ,

? ? password password ,

? ? tablename tablename ,

? ? username username ,

? ? url jdbc:postgresql://url:port/databasename ,

? ? maxretrytimes 2 ,

? ? batchsize 100 ,

? ? connectionmaxactive 5 ,

? ? conflictmode ignore ,

? ? usecopy 0 ,

? ? targetschema public ,

? ? exceptionmode ignore ,

? ? casesensitive 0 ,

? ? writemode 0 ,

? ? retrywaittime 200

);


insert into adbpg_sink2 select T.a, T.b, T.c, T.d, T.e, T.f, T.g, T.h, T.i, T.j, H.a, H.b, H.c, H.d, H.e, H.f, H.g, H.h, H.i, H.j FROM datagen_source2 AS T JOIN adbpg_dim2 FOR SYSTEM_TIME AS OF T.proctime AS H ON MOD(T.a, 10) H.a;

4、上传jar包

image

flink-connector-adbpg-1.11.1-jar-with-dependencies.jar


5、运行上线

点击验证、运行、上线 观察日志和数据库判断是否有异常 是否成功写入数据库。

image13.png

维表参数说明

参数名

参数含义

备注

url

ADBPG连接地址

必填 需要填写内网连接地址。

tableName

ADBPG源表名

必填 填写维表对应的ADBPG数据仓库中的表名。

userName

ADBPG用户名

必填。

password

ADBPG密码

必填。

joinMaxRows

左表一条记录连接右表的最大记录数

非必填 表示在一对多连接时 左表一条记录连接右表的最大记录数 默认值为1024 。在一对多连接的记录数过多时 可能会极大的影响流任务的性能 因此您需要增大Cache的内存 cacheSize限制的是左表key的个数 。

maxRetryTimes

单次SQL失败后重试次数

非必填 实际执行时 可能会因为各种因素造成执行失败 比如网络或者IO不稳定 超时等原因 ADBPG维表支持SQL执行失败后自动重试 用maxRetryTimes参数可以设定重试次数。默认值为3。

connectionMaxActive

连接池最大连接数

非必填 ADBPG维表中内置连接池 设置合理的连接池最大连接数可以兼顾效率和安全性 默认值为5。

retryWaitTime

重试休眠时间

非必填 每次SQL失败重试之间的sleep间隔 单位ms 默认值100

targetSchema

查询的ADBPG schema

非必填 默认值public

caseSensitive

是否大小写敏感

非必填 默认值0 即不敏感 填1可以设置为敏感

cache

缓存策略

目前分析型数据库PostgreSQL版支持以下三种缓存策略

none 默认值 无缓存。lru 缓存维表里的部分数据。源表来一条数据 系统会先查找Cache 如果没有找到 则去物理维表中查询。需要配置相关参数 缓存大小 cacheSize 和缓存更新时间间隔 cacheTTLMs 。

cacheSize

设置LRU缓存的最大行数

非必填 默认为10000行

cacheTTLMs

缓存更新时间间隔。系统会根据您设置的缓存更新时间间隔 重新加载一次维表中的最新数据 保证源表能JOIN到维表的最新数据。

非必填 单位为毫秒。默认不设置此参数 表示不重新加载维表中的新数据。


结果表参数说明

参数

注释说明

是否必选

备注

type

类型

固定值 为adbpg

url

jdbc连接地址

分析型数据库PostgreSQL版数据库的jdbc连接地址 。

格式为 jdbc:postgresql:// yourNetworkAddress : PortId / yourDatabaseName

其中 yourNetworkAddress 为目标分析型数据库PostgreSQL版数据库的主机地址 PortId 为连接端口 yourDatabaseName 为连接的数据库。

示例 url ’jdbc:postgresql://gp-xxxxxx.gpdb.cn-chengdu.rds.aliyuncs.com:5432/postgres‘。

tableName

表名

无。

username

账号

无。

password

密码

无。

maxRetryTimes

写入重试次数

默认为3。

useCopy

是否采用copy API写入数据

默认为1 表示采用copy API方式写入

当取值为0时 代表根据writeMode字段采用其他方式写入数据。

batchSize

一次批量写入的条数

默认值为5000。

exceptionMode

当存在写入过程中出现异常时的处理策略

支持以下两种取值

1 ignore : 忽略出现导致写入异常的数据

2 strict : 日志记录导致写入异常的数据 然后停止任务

默认取值为 ignore

conflictMode

当出现主键冲突或者唯一索引冲突时的处理策略

支持以下三种取值

1 ignore : 忽略出现导致主键冲突的数据

2 strict : 日志记录导致主键冲突的数据 然后停止任务

3 update :当出现主键冲突时更新为新值。

4) upsert : 以insert on conflict方式处理主键冲突。

默认取值为 ignore

targetSchema

schema名称

默认值为 public 。

writeMode

在useCopy字段基础上 更细分的写入方式

默认值为1 代表采用copy API写入数据

在useCopy字段为0的场景下 可以设定writeMode字段采用其他写入方式

writeMode 0 采用insert方式写入数据

writeMode 2 采用upsert方式写入数据。

upsert含义见文档

注意采用upsert方式写入时需要设定主键字段 设定主键的方式参考示例语句。



本文转自网络,原文链接:https://developer.aliyun.com/article/783805
本站部分内容转载于网络,版权归原作者所有,转载之目的在于传播更多优秀技术内容,如有侵权请联系QQ/微信:153890879删除,谢谢!

推荐图文

  • 周排行
  • 月排行
  • 总排行

随机推荐