mysql 作为成熟稳定的数据持久化解决方案,广泛地应用在各种领域,但是有些时候,我们在做查询时,由于查询条件的多样、变化多端(比如根据时间查、根据名称模糊查、根据id查等等),或者查询的数据来自很多不同的库表或者系统,这时就很难以一个较快的速度(几百毫秒)直接获取我们想要的数据,而 elasticsearch 作为数据分析领域的佼佼者,刚好可以弥补这项不足,而我们要做的只需要将 mysql 中的数据同步到 elasticsearch 中即可,而 logstash 刚好就是一个同步神器,能够很好的满足我们的需求。
一:版本说明logstash:7.6.1
elasticsearch:7.5.2
jdk:1.8
下载对应平台的压缩包,解压即可使用。
使用logstash自带的logstash-inpu-jdbc插件,实现从mysql向es同步数据
官方插件使用说明:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html
使用 logstash-input-jdbc 插件读取 mysql 的数据,原理就是定时执行一个 sql,然后将 sql 执行的结果写入到流中;增量获取的方式没有通过 binlog 方式同步,而是用一个递增字段作为条件去查询,每次都记录当前查询的位置,由于递增的特性,只需要查询比当前大的记录即可获取这段时间内的全部增量,一般的递增字段有两种,AUTO_INCREMENT 的主键 id 和 ON UPDATE CURRENT_TIMESTAMP 的 update_time 字段,id 字段只适用于那种只有插入没有更新的表,update_time 更加通用一些,建议在 mysql 表设计的时候都增加一个 update_time 字段.
input { # 使用logstash-input-jdbc插件 jdbc { # mysql连接驱动包 jdbc_driver_library = "/data/www/logstash-7.6.1/mysql-connector-java-5.1.44.jar" # mysql连接驱动类 jdbc_driver_class = "com.mysql.jdbc.Driver" # mysql jdbc连接串 jdbc_connection_string = "jdbc:mysql://xxxxxxxxxxx" # 设置时区 jdbc_default_timezone = "Asia/Shanghai" # 设置连接数据库的用户名和密码 jdbc_user = "xxx" jdbc_password = "xxxxx" # 开启分页 jdbc_paging_enabled = "true" # 开启分页后,每页数据条数大小 jdbc_page_size = "1000" # 使用本地时区 plugin_timezone = "local" # 开启大小写敏感 lowercase_column_names = "false" # 同时时间间隔,分别对应”分 时 月 周 年“ 后面可以设置时区,也可不设,如果注释schedule,则只运行一次同步,进程及停止 schedule = "*/5 * * * * Asia/Shanghai" # 读取数据库使用的SQL文件,其中有一个需要特别说明的关键字:sql_last_value,此值是读取last_run_metadata_path文件内记录的值 statement_filepath = "/data/www/logstash-7.6.1/conf/xxxxxxx.sql" # 记录每次同步tracking_column_type追踪的值 last_run_metadata_path = "/data/www/logstash-7.6.1/conf/xxxx_modify_time" # 开启记录最后同步的值 record_last_run = "true" # 设置false为记录时间戳,true为其他字段的值 use_column_value = "false" # 追踪字段的类型,可选timestamp、numeric tracking_column_type = timestamp # 具体跟踪的同步表字段 tracking_column = "modify_time" # 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录 clean_run = "false" #filter部分对数据进行处理,此处只对@timestamp字段处理,设置为本地时间 filter { ruby { code = "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)" ruby { code = "event.set('@timestamp',event.get('timestamp'))" mutate { remove_field = ["timestamp"] output { #向es同步数据的相关配置 elasticsearch { hosts = ["127.0.0.1:9200","127.0.0.1:9300"] #将数据中id字段设置为es中索引的_id,不使用es自动生成的随机_id,提供写入速度 document_id = "%{id}" #定义es中索引名称,如es没有创建或不存在,会自动创建,自动创建的索引可能会不满足要求,建议使用自行创建的索引 index = "xxxx_index" document_type = "_doc" # 以下配置为debug,可在测试中开启,方便定位问题,生产或正式使用务必关闭,以免产生大量日志 stdout { codec = rubydebug }jdbc_driver_library
mysql-connector-java-5.1.44.jar的存放目录,这个一定要配置正确,支持全路径和相对路径。
jdbc_driver_class驱动类的名字,mysql 填 com.mysql.jdbc.Driver 就好了
sql_last_value标志目前logstash同步的位置信息(类似offset)。比如id、updatetime。logstash通过这个标志,可以判断目前同步到哪一条数据。
statement执行同步的sql语句,可以同步部分数据;
statement_filepath存储执行同步的sql语句,不和statement同时使用;
schedule定时器,表示每隔多长时间同步一次数据,格式类似crontab;
tracking_column表示表中哪一列用于判断logstash同步的位置信息,与sql_last_value比较判断是否需要同步这条数据;
tracking_column_typeracking_column指定列的类型,支持两种类型:numeric(默认)、timestamp;
last_run_metadata_path存储sql_last_value值的文件名称及位置。
document_id生成elasticsearch的文档值,尽量使用同步的数据中已有的唯一标识。
注意??
1、同步单张表可以使用:logstash -f /path/to/xxxx.conf 2、同步多张表时,可通过pipeline管道的方式 3、当表数据超过百万级别时,建议分批同步,可按照每100w条数据同步一次来进行,以加快同步速度,具体可使用表id的区间来确定同步数量,则在SQL中增加id区间值。
多表同步的pipeline.yml配置如下:
- pipeline.id: xxxx1 pipeline.workers: 1 path.config: /data/www/logstash-7.6.1/member_conf/xxxx1.conf - pipeline.id: xxxx2 pipeline.workers: 1 path.config: /data/www/logstash-7.6.1/member_conf/xxxx2.conf - pipeline.id: xxxx3 pipeline.workers: 1 path.config: /data/www/logstash-7.6.1/member_conf/xxxx3.conf - pipeline.id: xxxx4 pipeline.workers: 1 path.config: /data/www/logstash-7.6.1/member_conf/xxxx4.conf四:启动
单表启动,指定conf配置文件的位置
bin/logstash -f config/logstash-mysql-es.conf llogstash.log 2 1
当使用多表同步时,启动时,不需要指定具体的conf配置文件,即:
bin/logstash logstash.log 2 1
信息化2.0时代提出开展智慧教育创新发展行动。2019年2月,中共中央、国务院印发...
从 10.0.0 版开始,异步迭代器就出现在 Node 中了,在本文中,我们将讨论异步迭...
摘要 元旦期间 订单业务线 告知 推送系统 无法正常收发消息,作为推送系统维护者...
Docker生成新镜像版本的两种方式 There are two ways Docker can generate new m...
本文整理自直播《Hologres 数据导入/导出实践-王华峰(继儒)》 视频链接: https:/...
2021年3月24日,主题为《数据的世界,世界的数据》的星环科技2021春季新品发布会...
在Python语言中有如下3种方法: 成员方法 类方法(classmethod) 静态方法(staticm...
前提条件 请您在购买前确保已完成注册和充值。详细操作请参见 如何注册公有云管...
建站 什么 虚拟主机 够用?这要看搭建的是什么类型的网站。比如个人博客类型的网...
【51CTO.com快译】 数据可视化工具不断发展,提供更强大的功能,同时改善可访问...