当前位置:主页 > 查看内容

数据湖实操讲解【OSS 访问加速】第八讲:Flume 高效写入 OSS

发布时间:2021-06-03 00:00| 位朋友查看

简介:本期导读 【OSS 访问加速】第八讲 主题 Flume 高效写入 OSS 讲师 焱冰 阿里巴巴计算平台事业部 EMR 技术专家 内容框架 Flume 简介Flume 常用组件Flume 使用 JindoFS SDKFlume 实战 JindoFS SDK 直播回放链接 7/8讲 https://developer.aliyun.com/live/246851……
本期导读 【OSS 访问加速】第八讲


主题 Flume 高效写入 OSS


讲师 焱冰 阿里巴巴计算平台事业部 EMR 技术专家


内容框架

Flume 简介Flume 常用组件Flume 使用 JindoFS SDKFlume 实战 JindoFS SDK


直播回放链接 7/8讲

https://developer.aliyun.com/live/246851



一、Flume 简介

Apache Flume?简介Apache Flume 是 Apache 基金会的一个顶级项目 以下简称 Flume。Flume 是一个分布式、可靠、高可用的系统 支持从不同数据源高效地收集、聚合、迁移大量日志数据 聚合到中心化的数据存储服务。Flume 使用最多的场景是日志收集 也可以通过定制 Source 来传输其他不同类型的数据。E-MapReduce 从 3.16.0 版本开始支持 Apache Flume。


image.png


Flume 中的概念及术语

image.png


一个 Flume Agent 由 Source、Channel、Sink 组成。


Event

数据流通过 Flume Agent 的基本单位。Event 由一个装载字节数组负载 Payload 和一个可选的字符串属性集合组成。

image.png


Source

数据源收集器 从外部数据源收集数据 并发送到 Channel。


Channel

Source 和 Sink 之间的缓冲队列。

Sink

从 Channel 中获取 Event 并将以事务的形式 commit 到外部存储中。一旦事务 commit 成功 该 Event 会从 Channel 中移除。



二、Flume 常用组件


常用组件介绍

常见 Source

Avro Source 通过监听 Avro 端口获取 Avro Client 发送的事件。Avro 是 Hadoop 提供的一种协议 用于序列化反序列化数据。Exec Source 通过监听命令行输出获取数据 如 tail -f /var/log/messages。NetCat TCP Source:?监听指定 tcp 端口获取数据。类似的还有 Netcat UDP Source。Taildir Source:?监控目录下的多个文件 会记录偏移量 不会丢失数据 最为常用。


常见 Channel

Memory Channel:?缓存到内存中 性能高 最为常用。File Channel:?缓存到文件中 会记录 checkpoint 和 data 文件 可靠性高 但性能较差。JDBC Channel:?缓存到关系型数据库中。Kakfa Channel 通过 Kafka 来缓存数据。


常见 Sink

Logger?Sink:?用于测试Avro Sink:?转换成 Avro Event 主要用于连接多个 Flume Agent。HDFS Sink:?写入 HDFS 最为常用。Hive sink:?写入 Hive 表或分区 使用 Hive 事务写 events。Kafka sink:?写入 Kafka。


文档

官方文档
https://flume.apache.org/documentation.html中文文档
https://flume.liyifeng.org/


三、Flume 使用 JindoFS?SDK


Flume 使用 JindoFS SDK 写入 OSS

环境要求

在集群上已经部署 Flume 已部署 JindoSDK 3.4 以上版本。


为什么需要使用 JindoFS SDK 写入 OSS

Flume 通过 flush() 调用保证事务性写入 OSS 本身不支持 Flush 功能 通过 JindoFS SDK 写入 OSS 虽然不能让 flush 后的数据立刻可见 但是可以保证 flush() 后的数据不丢失 Flume 作业失败后 可以使用 JindoFS 命令恢复 flush 过的数据。


配置示例

xxx.sinks.oss_sink.hdfs.path oss://${your_bucket}/flume_dir/%Y-%m-%d/%H 
xxx.sinks.oss_sink.hdfs.batchSize 100000 
xxx.sinks.oss_sink.hdfs.round true
xxx.sinks.oss_sink.hdfs.roundValue 15
xxx.sinks.oss_sink.hdfs.Unit minute
xxx.sinks.oss_sink.hdfs.filePrefix your_topic
xxx.sinks.oss_sink.rollSize 3600
xxx.sinks.oss_sink.threadsPoolSize 30

- 文档链接?

https://github.com/aliyun/alibabacloud-jindofs/blob/master/docs/flume/jindofs_sdk_on_flume_for_oss.md


在 EMR 集群内对 Flush 文件恢复

jindo jfs -recover [-R]
 [-flushStagingPath {flushStagingPath}]
 [-accessKeyId ${accessKeyId}]
 [-accessKeySecret ${accessKeySecret}]
 path 

注 如需递归恢复(-R) 建议先停止 Flume 任务 避免 Flume 任务运行异常。


在 EMR 集群外对 Flush 文件恢复

JindoOssFileSystem jindoFileSystem (JindoOssFileSystem) fs; 
boolean isFolder true; 
jindoFileSystem.recover(path, isFolder);

- 文档链接?

https://github.com/aliyun/alibabacloud-jindofs/blob/master/docs/flume/jindofs_sdk_on_flume_for_oss.md


四、Flume 实战?JindoFS?SDK


自建Flume 使用 JindoFS SDK 压缩写入 OSS

环境准备

Hadoop-2.8.5


下载

F?lume-1.9.0:wgethttps://downloads.apache.org/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz


添加依赖

cd $HADOOP_HOME/share/hadoop/httpfs/tomcat/webapps/webhdfs/WEB-INF/lib
cp commons-configuration-1.6.jar $FLUME_HOME/lib
cp hadoop-auth-2.8.5.jar $FLUME_HOME/lib
cp hadoop-common-2.8.5.jar $FLUME_HOME/lib
cp hadoop-hdfs-2.8.5.jar $FLUME_HOME/lib
cp commons-io-2.4.jar $FLUME_HOME/lib
cp htrace-core4-4.0.1-incubating.jar $FLUME_HOME/lib
wget https://smartdata-binary.oss-cn-shanghai.aliyuncs.com/jindofs-sdk-3.5.0.jar -O 
$FLUME_HOME/lib/jindofs-sdk-3.5.0.jar

配置 JindoFS SDK

https://github.com/aliyun/alibabacloud-jindofs/blob/master/docs/jindofs_sdk_how_to_hadoop.md


配置

a1.sources r1

a1.sinks k1

a1.channels c1


a1.sources.r1.type exec

a1.sources.r1.command tail -F /tmp/test.log


a1.channels.c1.type memory

a1.channels.c1.capacity 10000

a1.channels.c1.transactionCapacity 20


a1.sinks.k1.type hdfs

a1.sinks.k1.hdfs.path oss://yanbin-hd2-test/%Y-%m-%d/%H

a1.sinks.k1.hdfs.filePrefix test

a1.sinks.k1.hdfs.batchSize 20

a1.sinks.k1.hdfs.codeC? gzip

a1.sinks.k1.hdfs.fileType CompressedStream

a1.sinks.k1.rollCount 20

a2.sinks.k1.hdfs.minBlockReplicas 1


a1.sources.r1.channels c1

a1.sinks.k1.channel c1


日志仿真

while true; do echo date /tmp/test.log; sleep 1; done

Flume 启动

bin/flume-ng agent --name a1 -c conf -f conf/flume-exec-oss.conf -Dflume.root.logger INFO,console

结果

image.png


直接观看第四课 7/8讲 视频回放 获取实例讲解~

https://developer.aliyun.com/live/246851




?Github链接

https://github.com/aliyun/alibabacloud-jindofs


不错过每次直播信息、探讨更多数据湖 JindoFS OSS 相关技术问题 欢迎扫码加入钉钉交流群

image.png




本文转自网络,原文链接:https://developer.aliyun.com/article/784472
本站部分内容转载于网络,版权归原作者所有,转载之目的在于传播更多优秀技术内容,如有侵权请联系QQ/微信:153890879删除,谢谢!

推荐图文


随机推荐