当前位置:主页 > 查看内容

使用Logstash实现mysql同步数据到ES——《我的Java打怪日记》

发布时间:2021-07-21 00:00| 位朋友查看

简介:mysql 作为成熟稳定的数据持久化解决方案,广泛地应用在各种领域,但是有些时候,我们在做查询时,由于查询条件的多样、变化多端(比如根据时间查、根据名称模糊查、根据id查等等),或者查询的数据来自很多不同的库表或者系统,这时就很难以一个较快的速度……

mysql 作为成熟稳定的数据持久化解决方案,广泛地应用在各种领域,但是有些时候,我们在做查询时,由于查询条件的多样、变化多端(比如根据时间查、根据名称模糊查、根据id查等等),或者查询的数据来自很多不同的库表或者系统,这时就很难以一个较快的速度(几百毫秒)直接获取我们想要的数据,而 elasticsearch 作为数据分析领域的佼佼者,刚好可以弥补这项不足,而我们要做的只需要将 mysql 中的数据同步到 elasticsearch 中即可,而 logstash 刚好就是一个同步神器,能够很好的满足我们的需求。

一:版本说明

logstash:7.6.1
elasticsearch:7.5.2
jdk:1.8
下载对应平台的压缩包,解压即可使用。

二:原理

使用logstash自带的logstash-inpu-jdbc插件,实现从mysql向es同步数据
官方插件使用说明:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html

三:配置

使用 logstash-input-jdbc 插件读取 mysql 的数据,原理就是定时执行一个 sql,然后将 sql 执行的结果写入到流中;增量获取的方式没有通过 binlog 方式同步,而是用一个递增字段作为条件去查询,每次都记录当前查询的位置,由于递增的特性,只需要查询比当前大的记录即可获取这段时间内的全部增量,一般的递增字段有两种,AUTO_INCREMENT 的主键 id 和 ON UPDATE CURRENT_TIMESTAMP 的 update_time 字段,id 字段只适用于那种只有插入没有更新的表,update_time 更加通用一些,建议在 mysql 表设计的时候都增加一个 update_time 字段.

input {
# 使用logstash-input-jdbc插件
 jdbc {
 # mysql连接驱动包
 jdbc_driver_library = "/data/www/logstash-7.6.1/mysql-connector-java-5.1.44.jar"
 # mysql连接驱动类
 jdbc_driver_class = "com.mysql.jdbc.Driver"
 # mysql jdbc连接串
 jdbc_connection_string = "jdbc:mysql://xxxxxxxxxxx"
 # 设置时区
 jdbc_default_timezone = "Asia/Shanghai"
 # 设置连接数据库的用户名和密码
 jdbc_user = "xxx"
 jdbc_password = "xxxxx"
 # 开启分页
 jdbc_paging_enabled = "true"
 # 开启分页后,每页数据条数大小
 jdbc_page_size = "1000"
 # 使用本地时区
 plugin_timezone = "local"
 # 开启大小写敏感
 lowercase_column_names = "false"
 # 同时时间间隔,分别对应”分 时 月 周 年“ 后面可以设置时区,也可不设,如果注释schedule,则只运行一次同步,进程及停止
 schedule = "*/5 * * * * Asia/Shanghai"
 # 读取数据库使用的SQL文件,其中有一个需要特别说明的关键字:sql_last_value,此值是读取last_run_metadata_path文件内记录的值
 statement_filepath = "/data/www/logstash-7.6.1/conf/xxxxxxx.sql"
 # 记录每次同步tracking_column_type追踪的值
 last_run_metadata_path = "/data/www/logstash-7.6.1/conf/xxxx_modify_time"
 # 开启记录最后同步的值
 record_last_run = "true"
 # 设置false为记录时间戳,true为其他字段的值
 use_column_value = "false"
 # 追踪字段的类型,可选timestamp、numeric
 tracking_column_type = timestamp
 # 具体跟踪的同步表字段
 tracking_column = "modify_time"
 # 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
 clean_run = "false"
#filter部分对数据进行处理,此处只对@timestamp字段处理,设置为本地时间
filter {
 ruby {
 code = "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
 ruby {
 code = "event.set('@timestamp',event.get('timestamp'))"
 mutate {
 remove_field = ["timestamp"]
output {
#向es同步数据的相关配置
 elasticsearch {
 hosts = ["127.0.0.1:9200","127.0.0.1:9300"]
 #将数据中id字段设置为es中索引的_id,不使用es自动生成的随机_id,提供写入速度
 document_id = "%{id}"
 #定义es中索引名称,如es没有创建或不存在,会自动创建,自动创建的索引可能会不满足要求,建议使用自行创建的索引
 index = "xxxx_index"
 document_type = "_doc"
# 以下配置为debug,可在测试中开启,方便定位问题,生产或正式使用务必关闭,以免产生大量日志
 stdout {
 codec = rubydebug
}
jdbc_driver_library

mysql-connector-java-5.1.44.jar的存放目录,这个一定要配置正确,支持全路径和相对路径。

jdbc_driver_class

驱动类的名字,mysql 填 com.mysql.jdbc.Driver 就好了

sql_last_value

标志目前logstash同步的位置信息(类似offset)。比如id、updatetime。logstash通过这个标志,可以判断目前同步到哪一条数据。

statement

执行同步的sql语句,可以同步部分数据;

statement_filepath

存储执行同步的sql语句,不和statement同时使用;

schedule

定时器,表示每隔多长时间同步一次数据,格式类似crontab;

tracking_column

表示表中哪一列用于判断logstash同步的位置信息,与sql_last_value比较判断是否需要同步这条数据;

tracking_column_type

racking_column指定列的类型,支持两种类型:numeric(默认)、timestamp;

last_run_metadata_path

存储sql_last_value值的文件名称及位置。

document_id

生成elasticsearch的文档值,尽量使用同步的数据中已有的唯一标识。

注意??

1、同步单张表可以使用:logstash -f /path/to/xxxx.conf
2、同步多张表时,可通过pipeline管道的方式
3、当表数据超过百万级别时,建议分批同步,可按照每100w条数据同步一次来进行,以加快同步速度,具体可使用表id的区间来确定同步数量,则在SQL中增加id区间值。

多表同步的pipeline.yml配置如下:

- pipeline.id: xxxx1
 pipeline.workers: 1
 path.config: /data/www/logstash-7.6.1/member_conf/xxxx1.conf
- pipeline.id: xxxx2
 pipeline.workers: 1
 path.config: /data/www/logstash-7.6.1/member_conf/xxxx2.conf
- pipeline.id: xxxx3
 pipeline.workers: 1
 path.config: /data/www/logstash-7.6.1/member_conf/xxxx3.conf
- pipeline.id: xxxx4
 pipeline.workers: 1
 path.config: /data/www/logstash-7.6.1/member_conf/xxxx4.conf
四:启动

单表启动,指定conf配置文件的位置

bin/logstash -f config/logstash-mysql-es.conf llogstash.log 2 1 

当使用多表同步时,启动时,不需要指定具体的conf配置文件,即:

bin/logstash logstash.log 2 1 

本文转自网络,原文链接:https://developer.aliyun.com/article/785553
本站部分内容转载于网络,版权归原作者所有,转载之目的在于传播更多优秀技术内容,如有侵权请联系QQ/微信:153890879删除,谢谢!

推荐图文

  • 周排行
  • 月排行
  • 总排行

随机推荐