前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >自制MYSQL旁挂审计

自制MYSQL旁挂审计

原创
作者头像
大大刺猬
发布2023-04-07 14:58:55
4.5K3
发布2023-04-07 14:58:55
举报
文章被收录于专栏:大大刺猬大大刺猬

背景需求

生产环境中, 难免有执行有问题的SQL, 这个时候就需要有人背锅找出原因. 找到相关时间点的二进制日志, 然后解析就可以找到出问题的SQL, 但 那是谁执行的呢? 谁都不承认. 这时候就需要审计了.

审计分类

嵌入式, 就是在mysql端做记录, 也就是审计插件. 优点:无需更改链路. 缺点:影响mysql性能

转发型, 就是转发相关包, 并记录下来, 比如我们之前写的mysql流量镜像脚本. 优点:可以只审计固定的流量(需要转发的) 缺点:延迟会增加一丢丢

旁挂型, 就是在链路旁边监控流量并做记录. 优点:不影响链路和mysql性能. 缺点:只能做记录, 无法控制连接.

本文就是实现旁挂型审计

实现原理

获取mysql包 然后 解析mysql包

获取mysql包, 我们使用scapy的sniff抓包, 解析mysql包, 之前就已经做过了.

网卡或者路由器把客户端的流量转发一份给审计即可
网卡或者路由器把客户端的流量转发一份给审计即可

我们只获取mysql客户端发给服务端的包, 所以只需要目标端口为mysql的端口即可. 如果是部署在非本机上, 还可以设置个目标主机过滤(可选)

代码语言:javascript
复制
sniff(filter=f'dst port 3308', iface='ens32', prn=save_pack)
#filter 过滤规则, 和tcpdump一样的语法
#iface 网卡名字(目标IP所要经过的网卡)
#prn 处理函数

对于mysql包处理, 由于ssl加密太复杂(要私钥,还有随机数...), 所以我们只解析非ssl的流量(应用服务器到数据库服务器很少有使用SSL的, 毕竟多数都是内网环境). 主要解析3种包. 连接包, COM_QUERY包(sql), Quit包(断开连接)
代码语言:javascript
复制
if bdata[:1] == b'\x03':
    sql = bdata[1:].decode()
    _psql = sqlparse.parse(sql)[0] #没有做更深层次的解析了, 毕竟不需要
    msg += f"SQL({_psql.get_type()}) {_psql.value}"

elif lbdata > 32 and len(set(bdata[9:32])) == 1:
    username = bdata[32:32+bdata[32:].find(b'\x00')].decode()
    client_flag = btoint(bdata[:4])
    msg += f"CONNECT username:{username} client_flag:{client_flag}"
    
elif bdata[:1] == b'\x01':
    msg += f'QUIT '

演示

没有写接口出来, 直接修改源码(改下目标端口和网卡名字), 然后执行即可

代码语言:javascript
复制
python mysql_audit.py

客户端连接测试

代码语言:javascript
复制
mysql -h192.168.101.21 -P3308 -p123456  --skip-ssl #暂不支持SSL的审计

客户端退出测试

mysql退出的时候是发送的 0x01到server

代码语言:javascript
复制
mysql> exit

DDL/DML压测

先清空表吧, 压测工具可以参考ddcw_tool.benchmark_mysql

DDL是能记录的
DDL是能记录的

那个ping包是 workbench 在发

测试DML

虽然是模拟的数据, 但还是习惯性的打个码...
虽然是模拟的数据, 但还是习惯性的打个码...

总结

不影响性能的清空建议都上个审计.

本文使用的单进程做的, 可能存在性能问题(可以去掉SQL解析, 直接打印SQL文本).

本文的脚本不支持解析SSL.(如果是SSL包就直接跳过了).

旁挂型审计对业务和数据库影响最小(可以说没得影响), 但是无法控制连接,也无法解析ssl, 所以使用哪种审计得结合你的实际需求来.

附源代码

本工具直接将结果print的, 你也可以将结果保存在文件里(修改下printf函数即可)

代码语言:python
复制
from scapy.all import sniff
import sqlparse
import datetime

def btoint(bdata,t='little'):
	return int.from_bytes(bdata,t)

def printf(msg):
	print(msg)

def save_pack(pack):
	if hasattr(pack,'load') and len(pack.load) >= 5:
		bdata = pack.load
		msg = f"{str(datetime.datetime.now())} {pack['IP'].src}:{pack['TCP'].sport} "
		#msg = f"{str(datetime.datetime.now())} {pack['IP'].src}:{pack['TCP'].sport} {pack['IP'].dst}:{pack['TCP'].dport} "
		if len(bdata) == btoint(bdata[:3])+4: #不支持SSL
			bdata = bdata[4:]
		else:
			return None
		lbdata = len(bdata)


		PPACK = True
		if bdata[:1] == b'\x03':
			sql = bdata[1:].decode()
			_psql = sqlparse.parse(sql)[0]
			msg += f"SQL({_psql.get_type()}) {_psql.value}"
		elif bdata[:1] == b'\x16':
			msg += f"STMT_PREPARE {bdata[1:].decode()}"
		elif bdata[:1] == b'\x17':
			msg += f'STMT_EXECUTE {btoint(bdata[1:5])}' #懒得去解析具体的值了
		elif bdata[:1] == b'\x0E':
			msg += f'PING '
		elif bdata[:1] == b'\x01':
			msg += f'QUIT '
		elif lbdata > 32 and len(set(bdata[9:32])) == 1:
			username = bdata[32:32+bdata[32:].find(b'\x00')].decode()
			client_flag = btoint(bdata[:4])
			msg += f"CONNECT username:{username} client_flag:{client_flag}"
		else:
			PPACK = False
		if PPACK:
			printf(msg)
			

sniff(filter=f'dst port 3308', iface='ens32', prn=save_pack)

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景需求
  • 审计分类
  • 实现原理
  • 演示
    • 客户端连接测试
      • 客户端退出测试
        • DDL/DML压测
        • 总结
        • 附源代码
        相关产品与服务
        云数据库 MySQL
        腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
        http://www.vxiaotou.com