当前位置:主页 > 查看内容

MySQL流转工具Maxwell的代码改造和优化小结

发布时间:2021-04-28 00:00| 位朋友查看

简介:Maxwell是开源产品,相比Canal的体量也小很多,综合考虑下,在短期内选择了Maxwell. 从快速上手到功能支持,算是一个总体支持还不错的产品,也让技术调研和迭代进度进展相对快了很多。 一般说要比较,基本都会拿出这幅图来(数据带有主观特点,仅供参考),因为……

 

Maxwell是开源产品,相比Canal的体量也小很多,综合考虑下,在短期内选择了Maxwell.

从快速上手到功能支持,算是一个总体支持还不错的产品,也让技术调研和迭代进度进展相对快了很多。

一般说要比较,基本都会拿出这幅图来(数据带有主观特点,仅供参考),因为考虑到bootstrap是个硬性需求,所以这部分的功能考量也是一个重要权衡。

最近在做数据库到大数据流转的过程中发现了Maxwell的一些问题和改进点:

1)Maxwell的服务管理模式目前只支持start模式,如果要停止只能采用手工kill的方式,相对粗暴一些,当然和作者交流,可以使用信号处理的方式来间接实现。

2)Maxwell的核心配置是对于同步对象的过滤,可以支持正则等模式,如果过滤规则较为复杂,或者后期不断的调整,每次调整都需要重新启动Maxwell服务,没有类似reload的模式。

3)对于DDL变更,如果Maxwell的初始化已完成,服务已启动,在后续创建一张表的时候,Maxwell会把变更记录至`schemas`表中维护版本变更记录,在已有的元数据表中`tables`和`columns`里面就没有这些信息了,对于后端的服务解析表结构会带来一些偏差(DDL的变更配置会有相应的JSON)

1.问题定位

这些问题算起来大多是建议,不算是致命问题,所以整体的进度依然可以继续推进,但是最近在做了几张大表的同步之后发现了一些数据问题。

1)bootstrap的时间比较长,查看Maxwell相关监控,整体的数据吞吐量在800条/秒左右,好像是达到了整个同步的瓶颈,同步一张200多万数据的表需要1个小时左右,相对比较长,我们在近期的测试中,几张千万级的大表如果串行初始化,差不多得2-3个小时,实在是太长了。

2)同步数据的时间字段值存在差异,这也是在中端(maxwell规划为中端服务)和后端(Flink,Kudu规划为后端)在做数据比对中发现的,bootstrap的数据比对结果几乎没有相同的,也就意味是bootstrap在数据方面存在一些潜在问题,所以整个事情轮盘到了Maxwell的bootstrap部分。

查看代码逻辑,着实让我一惊,这个问题目前仅在bootstrap的环节出现,比如数据的时间字段值为:

但是经过逻辑处理后,会有时区的计算,会自动补上时区的差异。

现在的问题已经不是初始化带来的性能隐患,而是数据质量出现了侵入性,导致数据看起来错乱。

对于这个问题,分析的重点就是时区的处理差异,本来想这个改动应该很小,没想到调试和环境集成着实花了不少功夫。

2.问题修正

对于时区的数据差异,主要在于datetime数据类型存在时区差异,目前差距在13个小时。

查看Maxwell的代码类SynchronousBootstrapper:

经过调试,需要改动的代码逻辑范围是基于函数setRowValues:

可以修改为:

改动之后,整个bootstrap的逻辑经过调试和反复测试就正常了。

3.性能问题的取舍和修正

当然在这个之外,也做了一些细小的改进。

第一个问题就是bootstrap的性能问题,之前看似乎是有瓶颈,吞吐量在800左右就上不去了,对此我做了如下的改进:

1).bootstrap的一个基本原理就是select * from xxxx order by id;这种使用模式,如果表数据量比较大,其实order by的部分看起来是走了主键,该子句会强制走全索引扫描,但是整体的效果反而不是全表扫描,所以我就干脆去除了逻辑中的order by子句。

整个逻辑的改造也很轻量:

  1. private ResultSet getAllRows(String databaseName, String tableName, Table table, String whereClause, 
  2.         if ( pk != null && !pk.equals("") ) { 
  3.             sql += String.format(" order by %s", pk); 
  4.         } 

2).去除了写入数据后的sleep 1毫秒

进一步分析代码,发现bootstrap中吞吐量的瓶颈是其中一个诡异的sleep 1的处理,根据初步分析,可能考虑到bootstrap的任务会产生大量数据,对于带宽和负载压力较大,通过sleep的方式能够做到降速,整体可控。

另外对于bootstrap的日志统计中会包括同步的数据条数,这个指标值目前的依赖度不高,而且数据校验的工作目前会先停止Slave再进行数据比对

性能提升和改进,在3-5倍左右,所以这个部分的逻辑我们可以根据实际情况取舍,在我们的流传设计中,数据都是基于Slave端进行流转的,所以不会对主库造成冲击,改动的这个部分的逻辑也很轻巧,注释掉sleep(1)即可。

  1. public void performBootstrap(BootstrapTask task, AbstractProducer producer, Long currentSchemaID) throws Exception { 
  2.                producer.push(row); 
  3.                Thread.sleep(1); 
  4.                ++insertedRows; 

改动后经过测试和对比,发现性能好了很多,最多的时候能有6000多,同样的初始化不到15分钟左右就全部搞定了。

这样一些细小的改进也给我们带来了一些成就感,后续的数据同步规模继续扩大,也没有再反馈过数据质量的问题,当然在这个基础上还有一些工作需要细化。

4.后续对于bootstrap方向的改进

1)使用分片的思路来完善bootstrap

提高数据提取的效率,对于千万级以上的大表数据抽取,可以按照区间分段来提取(需要考虑到数据的变更和写入的影响),目前的逻辑过于僵硬。

  1. private ResultSet getAllRows(String databaseName, String tableName, Table table, String whereClause, 
  2.                                 Connection connection) throws SQLException { 
  3.        Statement statement = createBatchStatement(connection); 
  4.        String pk = table.getPKString(); 
  5.        String sql = String.format("select * from `%s`.%s", databaseName, tableName); 
  6.        if ( whereClause != null && !whereClause.equals("") ) { 
  7.            sql += String.format(" where %s", whereClause); 
  8.        } 
  9.        if ( pk != null && !pk.equals("") ) { 
  10.            sql += String.format(" order by %s", pk); 
  11.        } 
  12.        return statement.executeQuery(sql); 
  13.    } 

2)数据字典索引优化

Maxwell数据字典的优化,目前的数据字典中,部分SQL执行频率较高,但是从数据库层面来看是全表扫描,这些细节的地方还需要进一步调整。

比如如下的SQL语句:

  1. >>explain select * from bootstrap where is_complete = 0 and client_id = 'dts_hb30_130_200_maxwell003' and (started_at is null or started_at <= now()) order by isnull(started_at), started_at asc, id asc

基于业务场景的改进和调整,也让我们通过真实场景的落地,更好的拥抱开源,并在一定程度上能够回馈和反哺。

本文转载自微信公众号「杨建荣的学习笔记」,可以通过以下二维码关注。转载本文请联系杨建荣的学习笔记公众号


本文转载自网络,原文链接:https://mp.weixin.qq.com/s/tSLewRvRqid8qM_OaYhLaQ
本站部分内容转载于网络,版权归原作者所有,转载之目的在于传播更多优秀技术内容,如有侵权请联系QQ/微信:153890879删除,谢谢!

推荐图文


随机推荐