ADB PG版基于Flink 自定义conenctor支持读取 维表 和写入 结果表 。通过Flink SQL即可实现对ADB PG版的访问。
前提条件版本要求Flink 1.11及以上版本
ADBPG 6.0版本
网络要求ADBPG实例与Flink实例在同一VPC下
ADBPG设置白名单 开放对Flink实例的网络访问。
1、购买6.0版本ADBPG实例 创建账号 并设置白名单
2、连接数据库 创建待写入目标表、和待查询源数据表
create table test6(a int,b text,c text,d int,e int, f int, g bigint, h float, i double precision, j boolean);
insert into test6 values(0 , b0 , c0 , 40, ?50, ? ?60, ? ?70, ? ? ? 80.1, ? ?90.1, ? ? true
insert into test6 values(0 , b0 , c0 , 40, ?50, ? ?60, ? ?70, ? ? ? 80.2, ? ?90.2, ? ? false
insert into test6 values(1 , b1 , c1 , 41, ?50, ? ?60, ? ?70, ? ? ? 80.1, ? ?90.1, ? ? true
insert into test6 values(1 , b1 , c1 , 41, ?50, ? ?60, ? ?70, ? ? ? 80.2, ? ?90.2, ? ? false
insert into test6 values(2 , b2 , c2 , 40, ?50, ? ?60, ? ?70, ? ? ? 80.1, ? ?90.1, ? ? true
insert into test6 values(2 , b2 , c2 , 40, ?50, ? ?60, ? ?70, ? ? ? 80.2, ? ?90.2, ? ? false
insert into test6 values(3 , b3 , c3 , 40, ?50, ? ?60, ? ?70, ? ? ? 80.1, ? ?90.1, ? ? true
insert into test6 values(3 , b3 , c3 , 40, ?50, ? ?60, ? ?70, ? ? ? 80.2, ? ?90.2, ? ? false
insert into test6 values(4 , b4 , c4 , 40, ?50, ? ?60, ? ?70, ? ? ? 80.1, ? ?90.1, ? ? true
insert into test6 values(4 , b4 , c4 , 40, ?50, ? ?60, ? ?70, ? ? ? 80.2, ? ?90.2, ? ? false
insert into test6 values(5 , b5 , c5 , 40, ?50, ? ?60, ? ?70, ? ? ? 80.1, ? ?90.1, ? ? true
insert into test6 values(5 , b5 , c5 , 40, ?50, ? ?60, ? ?70, ? ? ? 80.2, ? ?90.2, ? ? false
insert into test6 values(6 , b6 , c6 , 40, ?50, ? ?60, ? ?70, ? ? ? 80.1, ? ?90.1, ? ? true
insert into test6 values(6 , b6 , c6 , 40, ?50, ? ?60, ? ?70, ? ? ? 80.2, ? ?90.2, ? ? false
insert into test6 values(7 , b7 , c7 , 40, ?50, ? ?60, ? ?70, ? ? ? 80.1, ? ?90.1, ? ? true
insert into test6 values(7 , b7 , c7 , 40, ?50, ? ?60, ? ?70, ? ? ? 80.2, ? ?90.2, ? ? false
insert into test6 values(8 , b8 , c8 , 40, ?50, ? ?60, ? ?70, ? ? ? 80.1, ? ?90.1, ? ? true
insert into test6 values(8 , b8 , c8 , 40, ?50, ? ?60, ? ?70, ? ? ? 80.2, ? ?90.2, ? ? false
insert into test6 values(9 , b9 , c9 , 40, ?50, ? ?60, ? ?70, ? ? ? 80.1, ? ?90.1, ? ? true
insert into test6 values(9 , b9 , c9 , 40, ?50, ? ?60, ? ?70, ? ? ? 80.2, ? ?90.2, ? ? false
create table test7(a int,b text,c text,d int,e int, f int, g bigint, h float, i double precision, j boolean, k int,l text,m text,n int,o int, p int, q bigint, r float, s double precision, t boolean);
1、创建Flink vvp版实例 要保证Flink实例与ADBPG实例处于同一个VPC下
2、创建SQL作业
3、作业开发
代码参考
CREATE TEMPORARY TABLE datagen_source2(
? a INT,
? b VARCHAR,
? c CHAR(15),
? d TINYINT,
? e SMALLINT,
? f INT,
? g BIGINT,
? h FLOAT,
? i DOUBLE,
? j BOOLEAN,
? proctime AS PROCTIME()
) with (
? connector datagen
);
CREATE TEMPORARY TABLE adbpg_dim2 (
? a INT,
? b VARCHAR,
? c CHAR(15),
? d TINYINT,
? e SMALLINT,
? f INT,
? g BIGINT,
? h FLOAT,
? i DOUBLE,
? j BOOLEAN
) with (
? ? connector adbpg ,
? ? password password ,
? ? tablename tablename ,
? ? username username ,
? ? url jdbc:postgresql://url:port/databasename ,
? ? maxretrytimes 2 ,
? ? connectionmaxactive 5 ,
? ? targetschema public ,
? ? casesensitive 0 ,
? ? retrywaittime 200 ,
? ? cache lru ,
? ? cacheSize 1000000 ,
? ? cacheTTLMs 2000000000
CREATE TEMPORARY TABLE adbpg_sink2(
? a INT,
? b VARCHAR,
? c CHAR(15),
? d TINYINT,
? e SMALLINT,
? f INT,
? g BIGINT,
? h FLOAT,
? i DOUBLE,
? j BOOLEAN,
? k INT,
? l VARCHAR,
? m CHAR(15),
? n TINYINT,
? o SMALLINT,
? p INT,
? q BIGINT,
? r FLOAT,
? s DOUBLE,
? t BOOLEAN
) with (
? ? connector adbpg ,
? ? password password ,
? ? tablename tablename ,
? ? username username ,
? ? url jdbc:postgresql://url:port/databasename ,
? ? maxretrytimes 2 ,
? ? batchsize 100 ,
? ? connectionmaxactive 5 ,
? ? conflictmode ignore ,
? ? usecopy 0 ,
? ? targetschema public ,
? ? exceptionmode ignore ,
? ? casesensitive 0 ,
? ? writemode 0 ,
? ? retrywaittime 200
);
insert into adbpg_sink2 select T.a, T.b, T.c, T.d, T.e, T.f, T.g, T.h, T.i, T.j, H.a, H.b, H.c, H.d, H.e, H.f, H.g, H.h, H.i, H.j FROM datagen_source2 AS T JOIN adbpg_dim2 FOR SYSTEM_TIME AS OF T.proctime AS H ON MOD(T.a, 10) H.a;
4、上传jar包
flink-connector-adbpg-1.11.1-jar-with-dependencies.jar
5、运行上线
点击验证、运行、上线 观察日志和数据库判断是否有异常 是否成功写入数据库。
维表参数说明参数名
参数含义
备注
url
ADBPG连接地址
必填 需要填写内网连接地址。
tableName
ADBPG源表名
必填 填写维表对应的ADBPG数据仓库中的表名。
userName
ADBPG用户名
必填。
password
ADBPG密码
必填。
joinMaxRows
左表一条记录连接右表的最大记录数
非必填 表示在一对多连接时 左表一条记录连接右表的最大记录数 默认值为1024 。在一对多连接的记录数过多时 可能会极大的影响流任务的性能 因此您需要增大Cache的内存 cacheSize限制的是左表key的个数 。
maxRetryTimes
单次SQL失败后重试次数
非必填 实际执行时 可能会因为各种因素造成执行失败 比如网络或者IO不稳定 超时等原因 ADBPG维表支持SQL执行失败后自动重试 用maxRetryTimes参数可以设定重试次数。默认值为3。
connectionMaxActive
连接池最大连接数
非必填 ADBPG维表中内置连接池 设置合理的连接池最大连接数可以兼顾效率和安全性 默认值为5。
retryWaitTime
重试休眠时间
非必填 每次SQL失败重试之间的sleep间隔 单位ms 默认值100
targetSchema
查询的ADBPG schema
非必填 默认值public
caseSensitive
是否大小写敏感
非必填 默认值0 即不敏感 填1可以设置为敏感
cache
缓存策略
目前分析型数据库PostgreSQL版支持以下三种缓存策略
none 默认值 无缓存。lru 缓存维表里的部分数据。源表来一条数据 系统会先查找Cache 如果没有找到 则去物理维表中查询。需要配置相关参数 缓存大小 cacheSize 和缓存更新时间间隔 cacheTTLMs 。cacheSize
设置LRU缓存的最大行数
非必填 默认为10000行
cacheTTLMs
缓存更新时间间隔。系统会根据您设置的缓存更新时间间隔 重新加载一次维表中的最新数据 保证源表能JOIN到维表的最新数据。
非必填 单位为毫秒。默认不设置此参数 表示不重新加载维表中的新数据。
参数
注释说明
是否必选
备注
type
类型
是
固定值 为adbpg
url
jdbc连接地址
是
分析型数据库PostgreSQL版数据库的jdbc连接地址 。
格式为 jdbc:postgresql:// yourNetworkAddress : PortId / yourDatabaseName
其中 yourNetworkAddress 为目标分析型数据库PostgreSQL版数据库的主机地址 PortId 为连接端口 yourDatabaseName 为连接的数据库。
示例 url ’jdbc:postgresql://gp-xxxxxx.gpdb.cn-chengdu.rds.aliyuncs.com:5432/postgres‘。
tableName
表名
是
无。
username
账号
是
无。
password
密码
是
无。
maxRetryTimes
写入重试次数
否
默认为3。
useCopy
是否采用copy API写入数据
否
默认为1 表示采用copy API方式写入
当取值为0时 代表根据writeMode字段采用其他方式写入数据。
batchSize
一次批量写入的条数
否
默认值为5000。
exceptionMode
当存在写入过程中出现异常时的处理策略
否
支持以下两种取值
1 ignore : 忽略出现导致写入异常的数据
2 strict : 日志记录导致写入异常的数据 然后停止任务
默认取值为 ignore
conflictMode
当出现主键冲突或者唯一索引冲突时的处理策略
否
支持以下三种取值
1 ignore : 忽略出现导致主键冲突的数据
2 strict : 日志记录导致主键冲突的数据 然后停止任务
3 update :当出现主键冲突时更新为新值。
4) upsert : 以insert on conflict方式处理主键冲突。
默认取值为 ignore
targetSchema
schema名称
否
默认值为 public 。
writeMode
在useCopy字段基础上 更细分的写入方式
否
默认值为1 代表采用copy API写入数据
在useCopy字段为0的场景下 可以设定writeMode字段采用其他写入方式
writeMode 0 采用insert方式写入数据
writeMode 2 采用upsert方式写入数据。
upsert含义见文档
注意采用upsert方式写入时需要设定主键字段 设定主键的方式参考示例语句。
3月8日消息,据外媒报道,物联网研究机构IoTAnalytics称,三分之一的制造商正计...
1.话费没了,流量没了,短信没了,寒假没了,作业还有。 2.别紧张,我又不是什么...
域名 过期多少天可以注册?域名过期大概60天以后会被注册局删除,删除之后就可以...
A公司在华为云中购买了多种资源,公司中有多个职能团队,这些职能团队需要使用一...
2020年注定是个不平凡的一年,新冠肺炎疫情全球蔓延,对全球经济发展、科技进步...
如果当前云服务器系统盘容量不能满足您的存储需要,您可以在购买了云服务器之后...
公司网站 域名 空间多少钱?公司搭 建网站 缺少不了域名和空间,具体需要多少钱...
1.现在赶作业是一种时尚,所以我很赶得上潮流。 2.我房间里堆满了情人节卡片,...
文章已收录Github精选,欢迎Star: https://github.com/yehongzhi/learningSumma...
目标 在仿真理论中,生成随机变量是最重要的构建块之一,而这些随机变量大多是由...