前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >开源数据集成平台SeaTunnel:MySQL实时同步到es

开源数据集成平台SeaTunnel:MySQL实时同步到es

原创
作者头像
javalover123
修改2023-07-24 09:50:13
1.1K4
修改2023-07-24 09:50:13
举报
文章被收录于专栏:Java爱好者Java爱好者

一、前言

  • 最近,项目有几个表要从 MySQL 实时同步到 另一个 MySQL,也有同步到 ElasticSearch 的。
  • 目前,公司生产环境同步,用的是 阿里云的 DTS,每个同步任务每月 500多元,有点小贵。
  • 其他环境:MySQL同步到ES,用的是 CloudCanal,不支持 数据转换,添加同步字段比较麻烦,社区版限制5个任务,不够用;MySQL同步到MySQL,用的是 debezium,不支持写入 ES。
  • 恰好3年前用过 SeaTunnel 的 前身 WaterDrop,那就开始吧。本文以 2.3.1 版本,Ubuntu 系统为例

二、开源数据集成平台SeaTunnel

1. 简介

  • SeaTunnel 是 Apache 软件基金会下的一个高性能开源大数据集成工具,为数据集成场景提供灵活易用、易扩展并支持千亿级数据集成的解决方案。
  • Seaunnel 为实时(CDC)和批量数据提供高性能数据同步能力,支持十种以上数据源,已经在B站、腾讯云、字节等数百家公司使用。
  • 可以选择 SeaTunnel Zeta 引擎上运行,也可以在 Apache Flink 或 Spark 引擎上运行。?developer/article/2304250/undefined
    seatunnel-architecture.png
    seatunnel-architecture.png

2. 安装

  • 下载,这里选择 2.3.1 版本,执行 tar -xzvf apache-seatunnel-*.tar.gz 解压缩 Caused by: java.sql.SQLException: No suitable driver at java.sql/java.sql.DriverManager.getDriver(DriverManager.java:298) at com.zaxxer.hikari.util.DriverDataSource.<init>(DriverDataSource.java:106) ... 20 more ... 11 more at org.apache.seatunnel.engine.client.job.ClientJobProxy.waitForJobComplete(ClientJobProxy.java:122) at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:181)
  • 因为 2.3.2 版本,MySQL-CDC 找不到驱动bug修复详见

3. 安装 connectors 插件

  • 执行 bash bin/install-plugin.sh,国内建议先配置 maven 镜像,不然容易失败 或者 慢
  • 官方文档写着执行 sh bin/install-plugin.sh,我在 Ubuntu 20.04.2 LTS 上执行报错(bin/install-plugin.sh: 54: Bad substitution),我提了PR?developer/article/2304250/undefined
    seatunnel-install-connectors-error.png
    seatunnel-install-connectors-error.png

4. 编写配置文件

  • config 目录下,新建配置文件:如 mysql-es-test.confenv { # You can set flink configuration here execution.parallelism = 1 job.mode = "STREAMING" checkpoint.interval = 2000 #execution.checkpoint.interval = 10000 #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" }
  • 添加 env 配置 因为是 实时同步,这里 job.mode = "STREAMING",execution.parallelism 是 并发数
  • MySQL 实时同步,需开启 binlogsource { MySQL-CDC { result_table_name = "t1" server-id = 5656 username = "root" password = "pwd" table-names = ["db.t1"] base-url = "jdbc:mysql://host:3306/db" } }
  • 添加 数据源 配置 result_table_name 取个 临时表名,便于后续使用。table-names 必须是 数据库.表名,base-url 必须指定 数据库。?developer/article/2304250/undefinedstartup.mode 默认是 INITIAL,先同步历史数据,后增量同步,详情点击
  • 添加 转换 配置,sql 比较灵活函数列表请点击 transform { Sql { source_table_name = "t1" query = "SELECT id, alias_name aliasName FROM t1 WHERE c1 = '1'" } }
  • 添加 输出 配置 CDC 实时同步 es,必须配置 primary_keys sink { Elasticsearch { hosts = ["host:9200"] username = "elastic" password = "pwd" index = "index_t1" # cdc required options primary_keys = ["id"] } }
  • 最终配置截图?developer/article/2304250/undefined
    seatunnel-mysql-cdc-es.png
    seatunnel-mysql-cdc-es.png

5. 启动任务

这里以 本地模式为例,另有 集群、spark、flink 模式。

代码语言:shell
复制
./bin/seatunnel.sh -e local --config ./config/mysql-es-test.conf

三、总结

本文遵守【CC BY-NC】协议,转载请保留原文出处及本版权声明,否则将追究法律责任。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、前言
  • 二、开源数据集成平台SeaTunnel
    • 1. 简介
      • 2. 安装
        • 3. 安装 connectors 插件
          • 4. 编写配置文件
            • 5. 启动任务
            • 三、总结
            相关产品与服务
            数据集成
            数据集成(DataInLong)源于腾讯开源并孵化成功的 ASF 顶级项目 Apache InLong(应龙),依托 InLong 百万亿级别的数据接入和处理能力支持数据采集、汇聚、存储、分拣数据处理全流程,在跨云跨网环境下提供可靠、安全、敏捷的全场景异构数据源集成能力。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
            http://www.vxiaotou.com