使用DataX将MySQL数据库中的全量数据同步迁移到表格存储(Tablestore)中。

准备工作

  • 创建表格存储实例

    您需要在表格存储控制台创建实例,然后在实例详情页面获取实例的服务地址(Endpoint)。

  • 创建表格存储数据表
    您需要创建用于存放迁移数据的表格存储数据表,详情请参见创建数据表
    说明 创建数据表时建议使用MySQL原主键或唯一索引作为表格存储数据表的主键。
  • 获取AccessKey

    要接入表格存储服务,您需要拥有一个有效的AccessKey(包括AccessKeyId和AccessKeySecret)用来进行签名认证。详情请参见创建AccessKey

步骤一:下载DataX

DataX通过MySQL驱动使用Reader中的MySQL连接串配置,直接发送SQL语句获取到查询数据,这些数据会缓存在本地JVM中,然后Writer线程将这些数据写入到表格存储的表中。

您可以选择下载DataX的源代码(开源)进行本地编译,也可以直接下载编译好的压缩包。

  • 下载DataX的源代码
    安装Git工具,然后执行以下操作:
    git clone https://github.com/alibaba/DataX.git
  • 下载编译好的压缩包
    • DataX的GitHub地址上获得下载链接:
      https://github.com/alibaba/DataX
    • DataX压缩包

步骤二:Maven打包

具体操作步骤如下:
说明
  • 如果您下载的是编译好的DataX压缩包,跳过此步骤。
  • 此步骤会在本地编译各种数据源的Writer和Reader,会花费较长的时间,需要耐心等待。
  1. 进入到下载的源码目录,然后执行:
    mvn -U clean package assembly:assembly -Dmaven.test.skip=true
  2. 编译完成后,进入/target/datax/datax目录,查看相应的目录,其中:
    目录 说明
    bin 存放着可执行的datax.py文件,是整个DataX工具的入口。
    plugin 存放支持各种类型数据源的Reader和Writer。
    conf 存放core.json文件,文件中定义了一些默认参数值如channel流控、buffer大小等参数,建议使用默认值。

步骤三:准备全量导出的JSON文件

在DataX中,mysqlreader配置有两种模式querySQL模式和table模式。

  • querySQL模式(单task)
    querySQL模式一般用于有条件的数据导出。在此模式下,DataX不会再按照指定的column、table参数进行SQL的拼接,而是会直接略过这些配置(如果有),直接执行querysql语句。task数量总是1,因此在此模式下channel的配置不再有多线程的效果。querySQL模式的数据导出示例代码如下:
    {
        "job": {
    
            "content": [
                {
                    "reader": {
                        "name": "mysqlreader", #指定使用mysqlreader读取
                        "parameter": {
                            "username": "username",#mySQL用户名
                            "password": "password",#mySQL密码
                            "connection": [
                                {
                                    "querySql": [ #指定执行的SQL语句
                                        "select bucket_name, delta , timestamp ,cdn_in, cdn_out ,total_request from vip_quota where bucket_name='xxx' "
                                    ],
                                    "jdbcUrl": ["jdbc:mysql://10.10.0.8:3306/db1?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true" #jdbc连接串
                                    ]
                                }
                            ]
                        }
                    },
    
                    "writer": {
                        "name": "otswriter",#指定使用otswriter进行写入
                        "parameter": {#数据源配置
                            "endpoint":"https://smoke-test.xxxx.ots.aliyuncs.com",#ots实例的endpoint
                            "accessId":"xxxx",
                            "accessKey":"xxxx",
                            "instanceName":"smoke-test",#实例名
                            "table":"vip_quota",#写入目标的table名称
                            # 下面是一些otswriter的限制项配置,默认可以不填,如果数据不符合以下规则的话,数据会被当成脏数据过滤掉
                            "requestTotalSizeLimitation": 1048576, #单行数据大小限制,默认1 MB,可不配置
                            "attributeColumnSizeLimitation": 2097152, #单个属性列大小限制,默认2 MB,可不配置
                            "primaryKeyColumnSizeLimitation": 1024, #单个主键列大小限制,默认配置1 KB,可不配置
                            "attributeColumnMaxCount": 1024, #属性列个数限制,默认1024,可不配置
    
                            "primaryKey":[#主键名称和类型
                                {"name":"bucket_name", "type":"string"},
                                {"name":"delta", "type":"int"},
                                {"name":"timestamp", "type":"int"}
                            ],
                            "column":[#其它column的名称和类型
                                {"name":"cdn_in","type":"int"},
                                {"name":"cdn_out","type":"int"},
                                {"name":"total_request","type":"int"}
                            ],
                            "writeMode":"UpdateRow"#写入模式
                        }
                    }
    
                }
            ]
        }
    }
  • table模式(多task)
    table模式下,您无需自己写select语句,而是由DataX根据JSON中的column、table、spliPk配置项,自行拼接SQL语句,观察执行日志如下:fig_mysqltablemodel
    table模式的数据导出示例代码如下:
    {
        "job": {
            "setting": {
                "speed": {
                     "channel": 3  #指定channel个数,这个参数跟并发数密切相关
                },
                "errorLimit": {#容错限制
                    "record": 0,
                    "percentage": 0.02
                }
            },
            "content": [
                {
                    "reader": {
                        "name": "mysqlreader",#指定使用mysqlreader读取
                        "parameter": {
                            "username": "username",#mysql用户名
                            "password": "password",#mysql密码
                            "column": [  #table模式下可以指定需要查询哪些列
                                "bucket_name",      
                                "timestamp" ,
                                "delta" , 
                                "cdn_in", 
                                "cdn_out" ,
                                "total_request"
                            ],
                            "splitPk": "timestamp",#指定split字段
                            "connection": [
                                {
    
                                    "table": [#导出的表名
                                        "vip_quota"
                                     ],
                                    "jdbcUrl": ["jdbc:mysql://10.10.1.7:3306/db1"#jdbc连接串
                                    ]
                                }
                            ]
                        }
                    },
    
                    "writer": {
                        "name": "otswriter",#指定使用otswriter进行写入
                        "parameter": {#数据源配置
                            "endpoint":"https://smoke-test.xxxx.ots.aliyuncs.com",#ots实例的endpoint
                            "accessId":"xxx",
                            "accessKey":"xxx",
                            "instanceName":"smoke-test",#实例名
                            "table":"vip_quota",#写入目标的table名称
                            "primaryKey":[#主键名称和类型
                                {"name":"bucket_name", "type":"string"},
                                {"name":"delta", "type":"int"},
                                {"name":"timestamp", "type":"int"}
                            ],
                            "column":[#其它column的名称和类型
                                {"name":"haha","type":"int"},
                                {"name":"hahah","type":"int"},
                                {"name":"kengdie","type":"int"}
                            ],
                            "writeMode":"UpdateRow"#写入模式
                        }
                    }
    
                }
            ]
        }
    }
    上述JSON文件中定义了一次数据导出导入的数据源信息和少量系统配置。配置主要分两部分:
    • setting:主要是speed(跟速率、并发相关)和errorLimit(容错限制)
      • channel:个数决定了reader和writer的个数上限
      • splitPk:指定了splitPk字段,DataX会将MySQL表中数据按照splitPk切分成n段。splitPk的字段限制了必需是整型或者字符串类型。由于DataX的实现方式是按照spliPk字段分段查询数据库表,那么spliPk字段的选取应该尽可能的选择分布均匀且有索引的字段,例如主键id、唯一键等字段。如果不指定splitPk字段,DataX将不会进行数据的切分,并行度直接退化成1。
        说明 为了保证同步数据的一致性,要么不配置spliPk使用单线程,要么确保迁移期间停止该MySQL数据库的服务。
    • content:主要是数据源信息,包含reader和writer两部分
    说明 配置中的MySQL应该确保执行DataX任务的机器能够正常访问。

执行同步命令

python datax.py  -j"-Xms4g -Xmx4g" mysql_to_ots.json

-j"-Xms4g -Xmx4g"可以限制占用JVM内存的大小。如果不指定,将会使用conf/core.json中的配置,默认是1 GB。