前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink1.12支持对接Atlas【使用Atlas收集Flink元数据】

Flink1.12支持对接Atlas【使用Atlas收集Flink元数据】

作者头像
用户1410343
发布2021-04-09 16:19:42
1.7K0
发布2021-04-09 16:19:42
举报
文章被收录于专栏:about云about云

问题导读 1.Atlas中实体具体指什么? 2.如何为Flink创建Atlas实体类型定义? 3.如何验证元数据收集? 在Cloudera Streaming Analytics中,可以将Flink与Apache Atlas一起使用,以跟踪Flink作业的输入和输出数据。 Atlas是沿袭和元数据管理解决方案,在Cloudera Data Platform上受支持。这意味着可以查找,组织和管理有关Flink应用程序以及它们如何相互关联的数据的不同资产。这实现了一系列数据管理和法规遵从性用例。 有关Atlas的更多信息,请参阅Cloudera Runtime文档。 Flink元数据集合中的Atlas实体 在Atlas中,表示Flink应用程序,Kafka主题,HBase表等的核心概念称为实体。需要了解Flink设置中实体的关系和定义,以增强元数据收集。 为Flink创建Atlas实体类型定义 在提交Flink作业以收集其元数据之前,需要为Flink创建Atlas实体类型定义。在命令行中,需要连接到Atlas服务器并添加预定义的类型定义。还需要在Cloudera Manager中为Flink启用Atlas。 验证元数据收集 启用Atlas元数据收集后,群集上新提交的Flink作业也将其元数据提交给Atlas。可以通过请求有关Atlas挂钩的信息来在命令行中使用消息验证元数据收集。 Flink元数据集合中的Atlas实体 在Atlas中,表示Flink应用程序,Kafka主题,HBase表等的核心概念称为实体。需要了解Flink设置中实体的关系和定义,以增强元数据收集。 在向Atlas提交更新时,Flink应用程序会描述自身以及用作源和接收器的实体。Atlas创建并更新相应的实体,并从收集到的和已经可用的实体创建沿袭。在内部,Flink客户端和Atlas服务器之间的通信是使用Kafka主题实现的。该解决方案被Atlas社区称为Flink挂钩。

为Flink创建Atlas实体类型定义 在提交Flink作业以收集其元数据之前,需要为Flink创建Atlas实体类型定义。在命令行中,需要连接到Atlas服务器并添加预定义的类型定义。还需要在Cloudera Manager中为Flink启用Atlas。 默认情况下,Atlas不包括Flink的元数据源。管理员必须手动将实体类型定义上载到群集,才能启动Flink元数据收集。 注意: 启用或禁用TLS时,Atlas管理服务器的默认端口分别为31433和31000。 步骤 1.使用Atlas REST API将设计的实体类型定义上载到集群。

代码语言:javascript
复制
  1. curl -k -u <atlas_admin>:<atlas_admin_pwd> --location --request POST 'https://<atlas_server_host>:<atlas_server_port>/api/atlas/v2/types/typedefs' \
  2. --header 'Content-Type: application/json' \
  3. --data-raw '{
  4. "enumDefs": [],
  5. "structDefs": [],
  6. "classificationDefs": [],
  7. "entityDefs": [
  8. {
  9. "name": "flink_application",
  10. "superTypes": [
  11. "Process"
  12. ],
  13. "serviceType": "flink",
  14. "typeVersion": "1.0",
  15. "attributeDefs": [
  16. {
  17. "name": "id",
  18. "typeName": "string",
  19. "cardinality": "SINGLE",
  20. "isIndexable": true,
  21. "isOptional": false,
  22. "isUnique": true
  23. },
  24. {
  25. "name": "startTime",
  26. "typeName": "date",
  27. "cardinality": "SINGLE",
  28. "isIndexable": false,
  29. "isOptional": true,
  30. "isUnique": false
  31. },
  32. {
  33. "name": "endTime",
  34. "typeName": "date",
  35. "cardinality": "SINGLE",
  36. "isIndexable": false,
  37. "isOptional": true,
  38. "isUnique": false
  39. },
  40. {
  41. "name": "conf",
  42. "typeName": "map<string,string>",
  43. "cardinality": "SINGLE",
  44. "isIndexable": false,
  45. "isOptional": true,
  46. "isUnique": false
  47. },
  48. {
  49. "name": "inputs",
  50. "typeName": "array<string>",
  51. "cardinality": "LIST",
  52. "isIndexable": false,
  53. "isOptional": false,
  54. "isUnique": false
  55. },
  56. {
  57. "name": "outputs",
  58. "typeName": "array<string>",
  59. "cardinality": "LIST",
  60. "isIndexable": false,
  61. "isOptional": false,
  62. "isUnique": false
  63. }
  64. ]
  65. }
  66. ],
  67. "relationshipDefs": []
  68. }'

复制代码

2.登录到Cloudera Manager。 3.转到Flink>配置。 4.在搜索栏中搜索“启用图集”。 5.启用Atlas元数据收集。

成功提交后,Flink客户端会通知Atlas有关作业的元数据。 验证元数据收集 启用Atlas元数据收集后,群集上新提交的Flink作业也将其元数据提交给Atlas。可以通过请求有关Atlas挂钩的信息来在命令行中使用消息验证元数据收集。 要验证元数据集合,可以从“运行Flink作业”中运行“流式WordCount”示例。 在日志中,出现以下新行:

代码语言:javascript
复制
  1. ...
  2. 20/05/13 06:28:12 INFO hook.FlinkAtlasHook: Collecting metadata for a new Flink Application: Streaming WordCount
  3. ...
  4. 20/05/13 06:30:35 INFO hook.AtlasHook: <== Shutdown of Atlas Hook

复制代码

Flink通过Kafka主题与Atlas通信,默认情况下,该主题名为ATLAS_HOOK。 转载注明本文链接

https://www.aboutyun.com/forum.php?mod=viewthread&tid=30521

----------------------------END----------------------------

本文参与?腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-03-23,如有侵权请联系?cloudcommunity@tencent.com 删除

本文分享自 About云 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与?腾讯云自媒体分享计划? ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com