主题 Flume 高效写入 OSS
内容框架
Flume 简介Flume 常用组件Flume 使用 JindoFS SDKFlume 实战 JindoFS SDK直播回放链接 7/8讲
https://developer.aliyun.com/live/246851
一个 Flume Agent 由 Source、Channel、Sink 组成。
Event
数据流通过 Flume Agent 的基本单位。Event 由一个装载字节数组负载 Payload 和一个可选的字符串属性集合组成。Source
数据源收集器 从外部数据源收集数据 并发送到 Channel。Channel
Source 和 Sink 之间的缓冲队列。Sink
从 Channel 中获取 Event 并将以事务的形式 commit 到外部存储中。一旦事务 commit 成功 该 Event 会从 Channel 中移除。常见 Source
Avro Source 通过监听 Avro 端口获取 Avro Client 发送的事件。Avro 是 Hadoop 提供的一种协议 用于序列化反序列化数据。Exec Source 通过监听命令行输出获取数据 如 tail -f /var/log/messages。NetCat TCP Source:?监听指定 tcp 端口获取数据。类似的还有 Netcat UDP Source。Taildir Source:?监控目录下的多个文件 会记录偏移量 不会丢失数据 最为常用。常见 Channel
Memory Channel:?缓存到内存中 性能高 最为常用。File Channel:?缓存到文件中 会记录 checkpoint 和 data 文件 可靠性高 但性能较差。JDBC Channel:?缓存到关系型数据库中。Kakfa Channel 通过 Kafka 来缓存数据。常见 Sink
Logger?Sink:?用于测试Avro Sink:?转换成 Avro Event 主要用于连接多个 Flume Agent。HDFS Sink:?写入 HDFS 最为常用。Hive sink:?写入 Hive 表或分区 使用 Hive 事务写 events。Kafka sink:?写入 Kafka。文档
官方文档环境要求
在集群上已经部署 Flume 已部署 JindoSDK 3.4 以上版本。
为什么需要使用 JindoFS SDK 写入 OSS
Flume 通过 flush() 调用保证事务性写入 OSS 本身不支持 Flush 功能 通过 JindoFS SDK 写入 OSS 虽然不能让 flush 后的数据立刻可见 但是可以保证 flush() 后的数据不丢失 Flume 作业失败后 可以使用 JindoFS 命令恢复 flush 过的数据。
配置示例
xxx.sinks.oss_sink.hdfs.path oss://${your_bucket}/flume_dir/%Y-%m-%d/%H xxx.sinks.oss_sink.hdfs.batchSize 100000 xxx.sinks.oss_sink.hdfs.round true xxx.sinks.oss_sink.hdfs.roundValue 15 xxx.sinks.oss_sink.hdfs.Unit minute xxx.sinks.oss_sink.hdfs.filePrefix your_topic xxx.sinks.oss_sink.rollSize 3600 xxx.sinks.oss_sink.threadsPoolSize 30
- 文档链接?
在 EMR 集群内对 Flush 文件恢复
jindo jfs -recover [-R] [-flushStagingPath {flushStagingPath}] [-accessKeyId ${accessKeyId}] [-accessKeySecret ${accessKeySecret}] path
注 如需递归恢复(-R) 建议先停止 Flume 任务 避免 Flume 任务运行异常。
在 EMR 集群外对 Flush 文件恢复
JindoOssFileSystem jindoFileSystem (JindoOssFileSystem) fs; boolean isFolder true; jindoFileSystem.recover(path, isFolder);
- 文档链接?
环境准备
Hadoop-2.8.5
下载
F?lume-1.9.0:wgethttps://downloads.apache.org/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
添加依赖
cd $HADOOP_HOME/share/hadoop/httpfs/tomcat/webapps/webhdfs/WEB-INF/lib cp commons-configuration-1.6.jar $FLUME_HOME/lib cp hadoop-auth-2.8.5.jar $FLUME_HOME/lib cp hadoop-common-2.8.5.jar $FLUME_HOME/lib cp hadoop-hdfs-2.8.5.jar $FLUME_HOME/lib cp commons-io-2.4.jar $FLUME_HOME/lib cp htrace-core4-4.0.1-incubating.jar $FLUME_HOME/lib
wget https://smartdata-binary.oss-cn-shanghai.aliyuncs.com/jindofs-sdk-3.5.0.jar -O $FLUME_HOME/lib/jindofs-sdk-3.5.0.jar
配置 JindoFS SDK
https://github.com/aliyun/alibabacloud-jindofs/blob/master/docs/jindofs_sdk_how_to_hadoop.md
配置
a1.sources r1
a1.sinks k1
a1.channels c1
a1.sources.r1.type exec
a1.sources.r1.command tail -F /tmp/test.log
a1.channels.c1.type memory
a1.channels.c1.capacity 10000
a1.channels.c1.transactionCapacity 20
a1.sinks.k1.type hdfs
a1.sinks.k1.hdfs.path oss://yanbin-hd2-test/%Y-%m-%d/%H
a1.sinks.k1.hdfs.filePrefix test
a1.sinks.k1.hdfs.batchSize 20
a1.sinks.k1.hdfs.codeC? gzip
a1.sinks.k1.hdfs.fileType CompressedStream
a1.sinks.k1.rollCount 20
a2.sinks.k1.hdfs.minBlockReplicas 1
a1.sources.r1.channels c1
a1.sinks.k1.channel c1
日志仿真
while true; do echo date /tmp/test.log; sleep 1; done
Flume 启动
bin/flume-ng agent --name a1 -c conf -f conf/flume-exec-oss.conf -Dflume.root.logger INFO,console
结果
直接观看第四课 7/8讲 视频回放 获取实例讲解~
https://developer.aliyun.com/live/246851
?Github链接
https://github.com/aliyun/alibabacloud-jindofs
不错过每次直播信息、探讨更多数据湖 JindoFS OSS 相关技术问题 欢迎扫码加入钉钉交流群
本文转载自公众号读芯术(ID:AI_Discovery) 如果你即将要面临大型科技公司的技术...
来源 | 阿里飞天CIO学堂微信公众号 金融数字化转型过程中,市场的细微变化,客户...
开源 RPC 框架有哪些呢?一类是跟某种特定语言平台绑定的,另一类是与语言无关即...
一、数据中台是真的热 在2018年之前可能只有一少部分人在谈中台,从2018年下半年...
一、背景 ? 我们大部分人的编程习惯都是线性编程,所谓线性编程就是一个请求涉及...
为了使伸缩组自动加入的实例自动部署应用,您需要创建私有镜像,确保该镜像上有...
最近,在为 Coco 优化分层架构之时,我陷入了各种决策困难之中。所以我通过不断...
计算的下一步发展是什么,将如何影响组织的战略?专家预测了边缘计算在2021年的发...
与普通的IDC机房或服务器厂商相比,阿里云提供的云服务器ECS具有高可用性、安全...
游戏市场的热度已经不言而喻,随着民众生活水平的提升,大家对于精神娱乐生活的...