前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >DTS双向同步的实现思路探索

DTS双向同步的实现思路探索

原创
作者头像
保持热爱奔赴山海
发布2023-12-08 21:21:00
2951
发布2023-12-08 21:21:00
举报
文章被收录于专栏:饮水机管理员饮水机管理员

某云厂商的DTS白皮书介绍:

与单向增量同步类似, 模拟Slave来获取增量数据。 同时UDTS对写下去的数据做标记,当有新的Binlog Event的时候, 会先检查是否有标记。 如果有标记则说明是循环数据,直接丢弃,如果没有标记则加上标记写到对端。

据此,我们可以基于debezium来实现一个双向同步脚本DEMO(只是demo代码)。

debezium捕获到的binlog events类型mapping如下:

https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-events

u update

c insert

d delete

debezium记录的binlog events例子如下:

代码语言:python
复制

-- insert 例子
{
    "before":"none",
    "after":{
        "id":160120,
        "b":"provident optio veritatis conseq"
    },
    "source":{
        "version":"2.1.1.Final",
        "connector":"mysql",
        "name":"yyx",
        "ts_ms":1672407157000,
        "snapshot":"false",
        "db":"db1",
        "sequence":"none",
        "table":"t1",
        "server_id":300,
        "gtid":"none",
        "file":"mysql-bin.000004",
        "pos":28589840,
        "row":9,
        "thread":182
    },
    "op":"c",
    "ts_ms":1672407157196,
    "transaction":"none"
}

-- delete 例子
{
    "before":{
        "id":114381,
        "b":"111"
    },
    "after":"None",
    "source":{
        "version":"2.1.1.Final",
        "connector":"mysql",
        "name":"yyx",
        "ts_ms":1672407489000,
        "snapshot":"false",
        "db":"db1",
        "sequence":"None",
        "table":"t1",
        "server_id":300,
        "gtid":"None",
        "file":"mysql-bin.000004",
        "pos":28590986,
        "row":0,
        "thread":154,
        "query":"delete from t1 where id=114381"
    },
    "op":"d",
    "ts_ms":1672407489300,
    "transaction":"None"
}

-- update 例子
{
    "before":{
        "id":114381,
        "b":"itaque error sit sunt aliquid it"
    },
    "after":{
        "id":114381,
        "b":"111"
    },
    "source":{
        "version":"2.1.1.Final",
        "connector":"mysql",
        "name":"yyx",
        "ts_ms":1672407316000,
        "snapshot":"false",
        "db":"db1",
        "sequence":"None",
        "table":"t1",
        "server_id":300,
        "gtid":"None",
        "file":"mysql-bin.000004",
        "pos":28590602,
        "row":0,
        "thread":154,
        "query":"update t1 set b=111 where id=114381"
    },
    "op":"u",
    "ts_ms":1672407316221,
    "transaction":"None"
}

-- replace 不存在记录
{
    "before":"None",
    "after":{
        "id":114381,
        "b":"aaaaa"
    },
    "source":{
        "version":"2.1.1.Final",
        "connector":"mysql",
        "name":"yyx",
        "ts_ms":1672407628000,
        "snapshot":"false",
        "db":"db1",
        "sequence":"None",
        "table":"t1",
        "server_id":300,
        "gtid":"None",
        "file":"mysql-bin.000004",
        "pos":28591342,
        "row":0,
        "thread":154,
        "query":"replace into t1 (id,b) values(114381,\"aaaaa\")"
    },
    "op":"c",
    "ts_ms":1672407628115,
    "transaction":"None"
}

-- replace 已存在记录
{
    "before":{
        "id":114381,
        "b":"aaaaa"
    },
    "after":{
        "id":114381,
        "b":"bbbbbb"
    },
    "source":{
        "version":"2.1.1.Final",
        "connector":"mysql",
        "name":"yyx",
        "ts_ms":1672407717000,
        "snapshot":"false",
        "db":"db1",
        "sequence":"None",
        "table":"t1",
        "server_id":300,
        "gtid":"None",
        "file":"mysql-bin.000004",
        "pos":28591701,
        "row":0,
        "thread":154,
        "query":"replace into t1 (id,b) values(114381,\"bbbbbb\")"
    },
    "op":"u",
    "ts_ms":1672407717924,
    "transaction":"None"
}

如果  op=c ,且 before is none ,则这是一个 insert into 语句
如果  op=u ,则这是一个 update 语句, 可以改写为 replace into 语句
如果  op=d ,且 after is none ,则这是一个 delete 语句

python代码实现的demo (目前看还有些bug,见文末)

配置文件 configs.py

代码语言:python
复制
# 下游MySQL的信息
mysql_host = "192.168.31.181"
mysql_port = "3306"
mysql_user = "dts"
mysql_pass = "dts"
mysql_db = "test"

# kafka连接信息
kafka_topic_name = "yyx.db1.t1"
kafka_group_id = "my-group"
kafka_bootstrap_servers = ["192.168.31.181:9092"]
kafka_auto_offset_reset = "earliest"    # 可选值 earliest  latest
kafka_auto_commit_interval_ms = 100  # 注意这里是整型

主程序 main_program.py

代码语言:python
复制
import json
import logging

import configs
import mysql.connector
from kafka import KafkaConsumer

logging.basicConfig(
    level=logging.DEBUG,
    filename="dts.log",
    filemode="a",
    format="%(asctime)s - "
    "%(pathname)s[line:%(lineno)d] - "
    "%(levelname)s: %(message)s",
)

mydb = mysql.connector.connect(
    host=configs.mysql_host,
    port=configs.mysql_port,
    user=configs.mysql_user,
    passwd=configs.mysql_pass,
    database=configs.mysql_db,
    autocommit=True,
)
mycursor = mydb.cursor()

consumer = KafkaConsumer(
    configs.kafka_topic_name,
    group_id=configs.kafka_group_id,
    bootstrap_servers=configs.kafka_bootstrap_servers,
    auto_offset_reset=configs.kafka_auto_offset_reset,
    auto_commit_interval_ms=configs.kafka_auto_commit_interval_ms,
    fetch_max_bytes=524288000,
    enable_auto_commit=True,
    max_poll_records=1000,
    send_buffer_bytes=1310720,
    receive_buffer_bytes=327680,
)


RUN_COUNTER = 0  # 排查bug,临时加了个counter变量,用于统计写入数据库的次数

for message in consumer:
    #  print(f'KEY: {message.key}\nVALUE:{message.value}')
    print(f"KEY: {message.key}")
    cc = json.loads(message.value)
    # print(cc)

    op = cc.get("op")

    db = cc.get("source")["db"]
    tb = cc.get("source")["table"]

    db_tb = cc.get("source")["db"] + "." + cc.get("source")["table"]

    before_data = cc.get("before")
    after_data = cc.get("after")

    event_query = cc.get("source")["query"]

    if "/* dts */" in event_query:
        print("检测到debezium标识,这个event是dts同步产生的sql,将跳过")
        # continue

    # 如果  op=c ,且 before is none ,则这是一个 insert into 语句
    # 如果  op=u ,则这是一个 update 语句, 可以改写为 replace into 语句
    # 如果  op=d ,且 after is none ,则这是一个 delete 语句

    elif after_data is None and op == "d":
        print("这是delete语句")
        EVENT_SQL = ""
        for i, v in enumerate(before_data):
            CONDITION = "`" + str(v) + "`" + "=" + "'" + str(before_data[v]) + "'"
            EVENT_SQL = EVENT_SQL + " and " + CONDITION
            # print(EVENT_SQL)
        EVENT_SQL = "DELETE FROM " + db_tb + " where 1=1 " + EVENT_SQL + ";"
        # print('提取到的sql ---> ', EVENT_SQL)

    elif op in ("c", "u"):
        print("这是insert或者update语句,统一转成replace into写法")
        VALUES = ""
        COLUMNS = ""
        for i, v in enumerate(after_data):
            COLUMNS += "`" + str(v) + "`" + ","
            VALUES += "'" + str(after_data[v]).replace(r"'", r"\'") + "'" + ","
        EVENT_SQL = (
            "INSERT INTO "
            + db_tb
            + " ("
            + COLUMNS[0:-1]
            + ") "
            + " values "
            + " ("
            + VALUES[0:-1]
            + ");"
        )
        # print('提取到的sql ---> ', EVENT_SQL)

    else:
        print("未识别的sql类型")
        # EVENT_SQL = 'select 1;'
        continue

    QUERY_SQL = "/* dts */ " + EVENT_SQL

    # print('python 最终执行的sql ---> ', QUERY_SQL)

    with open("./QUERY_SQL.sql", "a+", encoding="utf-8") as f:
        f.write(QUERY_SQL + "\n")
    try:
        mycursor.execute(QUERY_SQL)  # 默认开的自动提交
        RUN_COUNTER += 1
        print("数据库尝试写入次数 ---> ", RUN_COUNTER)
    except Exception as e:
        with open("./err.log", "a+", encoding="utf-8") as f:
            f.write(str(e) + QUERY_SQL + "\n")

其它

代码语言:shell
复制
kafka里面的数据来源,是根据《基于debezium的双向复制初探.docx》 这种CDC方式写入的。
这里代码里的数据读取和清洗逻辑,都是和debezium的强绑定的。

写入速度上:使用顺序写入MySQL,自动提交,差不多每分钟3w行记录。

目前测试发现有未知的bug,具体现象:
如果一次性写入1w条记录,则dts到dest的写入过程中不会丢数据
如果一次性写入5w或者更多的记录,则dts到dest的写入过程中会出现丢数据的情况
已经验证过kafka里面的数据是没有缺失的,因此问题出在了dts到dest的写入过程中,但是try except并没有捕获到丢数据的报错日志

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
数据传输服务
腾讯云数据传输服务(Data Transfer Service,DTS)可帮助用户在业务不停服的前提下轻松完成数据库迁移上云,利用实时同步通道轻松构建高可用的数据库多活架构,通过数据订阅来满足商业数据挖掘、业务异步解耦等场景需求。同时,DTS 还提供私有化独立输出版本 DTS-DBbridge,支持异构数据库和同构数据库之间迁移和同步,可以帮助企业实现完整数据库迁移(如 Oracle)。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com