主题 Flink 高效 sink 写入 OSS
内容框架
背景介绍功能介绍如何配置如何使用直播回放链接 7/8讲
https://developer.aliyun.com/live/246851
Apache Flink 是新一代大数据计算引擎的代表 以分布式流计算为核心 同时支持批处理。特点
低延时 Flink 流式计算可以做到亚秒甚至毫秒级延时 相比之下 Spark 流计算很难达到秒级阿里云对象存储 Object Storage Service(OSS)
海量 无限容量 弹性伸缩Flink 应用广泛
流计算领域业内主要解决方案Flink 使用痛点
开源 ApacheFlink 尚不支持直接写入 OSS整体架构
两阶段 Checkpoint (检查点) 机制
第一阶段 MPU (MultiPartUpload 分片上传) 写入 OSSRecoverable Writer 可恢复性写入
临时文件以普通文件格式上传 OSS写入 OSS vs. ?写入 亚马逊S3
Native 实现 数据写入以 C 代码实现 相比 Java 更高效OSS 访问加速 JindoFS 提供新支持
环境要求
集群上有开源版本 Flink 软件 版本不低于1.10.1SDK 配置
下载所需 SDK 文件
jindo-flink-sink-${version}.jar将两个 jar 放置于集群 Flink 目录下 lib 文件夹
Flink 根目录通常可由 $FLINK_HOME 环境变量获取Java SPI 自动加载资源 无需额外配置
?文档链接(Github)
在程序中使用 JindoFS Flink Connector
确保集群能够访问 OSS Bucket
前提 已购买 OSS 产品 OSS 网站链接使用合适的路径 流式写入OSS Bucket
写入 OSS 须使用 oss:// 前缀路径 类似于oss:// user-bucket / user-defined-sink-dir
更多优化 用 JindoFS SDK 加速 OSS 访问 参考
?Github
https://github.com/aliyun/alibabacloud-jindofs/blob/master/docs/jindofs_sdk_vs_hadoop_sdk.md
在程序中使用 JindoFS Flink Connector Java
在程序中开启 Flink Checkpoint
前提 使用可重发的数据源 如 Kafka建立
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
打开
env.enableCheckpointing( userDefinedCheckpointInterval , CheckpointingMode.EXACTLY_ONCE);
示例程序
下文中 outputStream 是一个预先形成的 DataStream 对象 若需写入 OSS 则可以这样添加 sinkString outputPath oss:// user-bucket / user-defined-sink-dir StreamingFileSink String sink StreamingFileSink.forRowFormat( new Path(outputPath), new SimpleStringEncoder String ( UTF-8 ) ).build(); outputStream.addSink(sink);上述程序指定将 outputStream 中的String 内容写入 OSS 路径 oss:/// 最后还需用 env.execute() 语句执行 Flink 作业 env 是已建立的 StreamExecutionEnvironment 对象
在程序中使用 JindoFS Flink Connector Pyflink
与Java 示例类似 在 Pyflink 中使用 JindoFS Flink Connector 与写入 HDFS 等其他介质方式相同 只需
将写入路径写作合适的 OSS 路径例如 下列 Python 程序定义了一张位于 OSS 的表
sink_dest oss:// user-bucket / user-defined-sink-dir sink_ddl f CREATE TABLE mySink ( uid INT, pid INT ) PARTITIONED BY ( ) WITH ( connector filesystem , fpath {sink_dest} , format csv , sink.rolling-policy.file-size 2MB , sink.partition-commit.policy.kind success-file
然后将其添加到 StreamTableEnvironmentt_env 中即可 t_env.sql_update(sink_ddl)
在程序中使用 JindoFS Flink Connector 更多配置
用户通过 flink run 提交 java 或 pyflink 程序时 可以额外自定义一些参数 格式
????? flink run -m yarn-cluster -yD key1 value1 -yD key2 value2 ...
目前支持“熵注入”及“分片上传并行度”两项配置
熵注入(entropyinjection)
功能 将写入路径的一段特定字符串匹配出来 用一段随机的字符串进行替换? ?oss.entropy.key user-defined-key
??oss.entropy.length user-defined-length
分片上传并行度
配置参数 oss.upload.max.concurrent.uploads直接观看第四课 7/8讲 视频回放 获取实例讲解~
https://developer.aliyun.com/live/246851
?Github链接
https://github.com/aliyun/alibabacloud-jindofs
不错过每次直播信息、探讨更多数据湖 JindoFS OSS 相关技术问题 欢迎扫码加入钉钉交流群
计算的下一步发展是什么,将如何影响组织的战略?专家预测了边缘计算在2021年的发...
本文转载自公众号读芯术(ID:AI_Discovery) 如果你即将要面临大型科技公司的技术...
来源 | 阿里飞天CIO学堂微信公众号 金融数字化转型过程中,市场的细微变化,客户...
开源 RPC 框架有哪些呢?一类是跟某种特定语言平台绑定的,另一类是与语言无关即...
游戏市场的热度已经不言而喻,随着民众生活水平的提升,大家对于精神娱乐生活的...
一、背景 ? 我们大部分人的编程习惯都是线性编程,所谓线性编程就是一个请求涉及...
为了使伸缩组自动加入的实例自动部署应用,您需要创建私有镜像,确保该镜像上有...
与普通的IDC机房或服务器厂商相比,阿里云提供的云服务器ECS具有高可用性、安全...
最近,在为 Coco 优化分层架构之时,我陷入了各种决策困难之中。所以我通过不断...
一、数据中台是真的热 在2018年之前可能只有一少部分人在谈中台,从2018年下半年...