前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >基于查询的MySQL到ES的数据同步

基于查询的MySQL到ES的数据同步

原创
作者头像
保持热爱奔赴山海
修改2024-04-03 08:56:56
1270
修改2024-04-03 08:56:56
举报
文章被收录于专栏:饮水机管理员饮水机管理员

个别场景下,开发提需求,需要把某个MySQL里面指定数据同步到ES中,希望能有一个通用的脚本,用于特殊场景下的补数据或者临时性的数据同步。

注意: python es包的版本如果和es服务端的版本不一致的话,可能遇到报错。把python es的包版本换成和server端一致的版本即可。

下面的这个脚本,是用python+django+celery来实现上述功能的。核心代码如下:

方法1 逐条记录同步

代码语言:python
复制
# -*- coding: utf-8 -*-
# 根据MySQL表的update_time列同步增量数据到ES中,注意必须带上esId这个字段,这个值是作为ES的_id的


import os
import sys
import time
import mysql.connector
from elasticsearch import Elasticsearch


def sync(task_name, mysql_host, mysql_port, mysql_user, mysql_pass, sql_condition, es_addr, es_index_name):
    try:
        mydb = mysql.connector.connect(
            host=mysql_host,
            port=mysql_port,
            user=mysql_user,
            passwd=mysql_pass,
			ssl_disabled=True,
        )
        # 注意这里把结果输出为dict格式
        mycursor = mydb.cursor(dictionary=True)
    except Exception as e:
		# 发送钉钉告警
        sys.exit(10)

    mycursor.execute(sql_condition)
    res = mycursor.fetchall()

    es = Elasticsearch(es_addr, timeout=60, max_retries=10, retry_on_timeout=True)

    for i in res:
        try:
            es.index(index=es_index_name, body=dict(i), id=i['esId'])
        except Exception as e:
            # 发送钉钉告警
			pass
        finally:
            continue


if __name__ == '__main__':
    t1 = time.time()
    sync("DBA-测试",
         "192.168.31.181", 3306, "dts", "dts",
         "select id as esId,k,c,pad,ccc from sbtest.sbtest3",
         ['http://192.168.31.181:8989'],
         'dba-test-new-2',
         )
    t2 = time.time()
    print(t2-t1)

方法2 批量同步方式【推荐用这种】

代码语言:python
复制
# -*- coding: utf-8 -*-
import time
import os
import sys

from elasticsearch import Elasticsearch,helpers
import mysql.connector  

def sync(task_name, mysql_host, mysql_port, mysql_user, mysql_pass, sql_condition, es_addr, es_index_name):
    try:
        mydb = mysql.connector.connect(
            host=mysql_host,
            port=mysql_port,
            user=mysql_user,
            passwd=mysql_pass,
            ssl_disabled=True,
        )
        mycursor = mydb.cursor(dictionary=True)
    except Exception as e:
        sys.exit(10)

    try:
        mycursor.execute(sql_condition)
        res = mycursor.fetchall()

        es = Elasticsearch(es_addr, request_timeout=60, max_retries=10, retry_on_timeout=True)
        
        # 准备bulk操作的数据  
        actions = (  
            {  
                "_index": es_index_name,  
                "_id": i['esId'],  
                "_source": dict(i)  
            } for i in res  
        )
 
        # 使用helpers.bulk进行批量写入  
        success, _ = helpers.bulk(es, actions)  
        print(f"Indexed {success} documents.")  

    except Exception as e:  
        print(f"Error indexing documents to Elasticsearch: {e}")  
    finally:  
        mycursor.close()  
        mydb.close()

if __name__ == '__main__':
    t1 = time.time()
    sync("DBA-测试",
         "192.168.31.181", 3306, "dts", "dts",
         "select id as esId,k,c,pad,ccc from sbtest.sbtest3",
         ['http://192.168.31.181:8989'],
         'dba-test-new-2',
         )
    t2 = time.time()
    print(t2-t1)

耗时

代码语言:plaintext
复制
MySQL端记录数: 94326
行记录demo:
[sbtest]> select  * from sbtest3 limit 10,1 \G
*************************** 1. row ***************************
 id: 5685
  k: 50479
  c: 91674238320-66576604523-84892597271-42298112537-31748098687-87592861234-27236853894-78260103493-10155978333-85784381566
pad: 50437784151-86772162187-31166376983-54827989967-72340867827
ccc: NULL
1 row in set (0.00 sec)



逐条提交: 耗时 389秒
批量提交: 耗时 12秒

设置索引mapping,否则可能出现时间列格式等其它问题

代码语言:python
复制
PUT dba-test
{
  "mappings": {
    "properties": {
      "orderNo": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "esId": {
        "type": "keyword"
      },
      "name": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "sex": {
        "type": "keyword"
      },
      "update_time": {
        "type": "date",
        "format": "yyyy-MM-dd HH:mm:ss||strict_date_optional_time||epoch_millis"
      },
      "remark": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      }
    }
  }
}

celery 任务配置类似如下:

代码语言:python
复制
{
    "task_name": "dba-test",
    "mysql_host": "192.168.1.11",
    "mysql_port": 3306,
    "mysql_user": "dts",
    "mysql_pass": "dts",
    "sql_condition": "select a as esId,a as orderNo,name,sex,update_time,remark from sbtest.t1 where update_time>DATE_FORMAT(DATE_SUB(NOW(), INTERVAL 5 MINUTE), '%Y-%m-%d %H:%i:%s')",
    "es_addr": [
        "192.168.1.110:9200",
        "192.168.1.111:9200",
        "192.168.1.112:9200"
    ],
    "es_index_name": "dba-test"
}

根据实际需要,celery里面可以使用周期性任务或者一次性任务。

此外,这里的sql_condition 也支持复杂条件,例如直接进行2表关联取值(注意esId不要重复就行):

代码语言:sql
复制
"sql_condition": "select b.a as esId,a.update_time,a.name,a.sex,b.addr,b.job from sbtest.t1 a inner join sbtest.t2 b on a.name=b.name where a.update_time>DATE_FORMAT(DATE_SUB(NOW(), INTERVAL 5 MINUTE), '%Y-%m-%d %H:%i:%s')",

生产上,还需要接钉钉告警,如果数据同步失败,会及时通知,类似如下:

告警示例
告警示例

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 方法1 逐条记录同步
  • 方法2 批量同步方式【推荐用这种】
  • 耗时
  • 设置索引mapping,否则可能出现时间列格式等其它问题
相关产品与服务
Elasticsearch Service
腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com