生产环境中, 难免有执行有问题的SQL, 这个时候就需要有人背锅找出原因. 找到相关时间点的二进制日志, 然后解析就可以找到出问题的SQL, 但 那是谁执行的呢? 谁都不承认. 这时候就需要审计了.
嵌入式, 就是在mysql端做记录, 也就是审计插件. 优点:无需更改链路. 缺点:影响mysql性能
转发型, 就是转发相关包, 并记录下来, 比如我们之前写的mysql流量镜像脚本. 优点:可以只审计固定的流量(需要转发的) 缺点:延迟会增加一丢丢
旁挂型, 就是在链路旁边监控流量并做记录. 优点:不影响链路和mysql性能. 缺点:只能做记录, 无法控制连接.
本文就是实现旁挂型审计
获取mysql包 然后 解析mysql包
获取mysql包, 我们使用scapy的sniff抓包, 解析mysql包, 之前就已经做过了.
我们只获取mysql客户端发给服务端的包, 所以只需要目标端口为mysql的端口即可. 如果是部署在非本机上, 还可以设置个目标主机过滤(可选)
sniff(filter=f'dst port 3308', iface='ens32', prn=save_pack)
#filter 过滤规则, 和tcpdump一样的语法
#iface 网卡名字(目标IP所要经过的网卡)
#prn 处理函数
if bdata[:1] == b'\x03':
sql = bdata[1:].decode()
_psql = sqlparse.parse(sql)[0] #没有做更深层次的解析了, 毕竟不需要
msg += f"SQL({_psql.get_type()}) {_psql.value}"
elif lbdata > 32 and len(set(bdata[9:32])) == 1:
username = bdata[32:32+bdata[32:].find(b'\x00')].decode()
client_flag = btoint(bdata[:4])
msg += f"CONNECT username:{username} client_flag:{client_flag}"
elif bdata[:1] == b'\x01':
msg += f'QUIT '
没有写接口出来, 直接修改源码(改下目标端口和网卡名字), 然后执行即可
python mysql_audit.py
mysql -h192.168.101.21 -P3308 -p123456 --skip-ssl #暂不支持SSL的审计
mysql退出的时候是发送的 0x01到server
mysql> exit
先清空表吧, 压测工具可以参考ddcw_tool.benchmark_mysql
那个ping包是 workbench 在发
测试DML
不影响性能的清空建议都上个审计.
本文使用的单进程做的, 可能存在性能问题(可以去掉SQL解析, 直接打印SQL文本).
本文的脚本不支持解析SSL.(如果是SSL包就直接跳过了).
旁挂型审计对业务和数据库影响最小(可以说没得影响), 但是无法控制连接,也无法解析ssl, 所以使用哪种审计得结合你的实际需求来.
本工具直接将结果print的, 你也可以将结果保存在文件里(修改下printf函数即可)
from scapy.all import sniff
import sqlparse
import datetime
def btoint(bdata,t='little'):
return int.from_bytes(bdata,t)
def printf(msg):
print(msg)
def save_pack(pack):
if hasattr(pack,'load') and len(pack.load) >= 5:
bdata = pack.load
msg = f"{str(datetime.datetime.now())} {pack['IP'].src}:{pack['TCP'].sport} "
#msg = f"{str(datetime.datetime.now())} {pack['IP'].src}:{pack['TCP'].sport} {pack['IP'].dst}:{pack['TCP'].dport} "
if len(bdata) == btoint(bdata[:3])+4: #不支持SSL
bdata = bdata[4:]
else:
return None
lbdata = len(bdata)
PPACK = True
if bdata[:1] == b'\x03':
sql = bdata[1:].decode()
_psql = sqlparse.parse(sql)[0]
msg += f"SQL({_psql.get_type()}) {_psql.value}"
elif bdata[:1] == b'\x16':
msg += f"STMT_PREPARE {bdata[1:].decode()}"
elif bdata[:1] == b'\x17':
msg += f'STMT_EXECUTE {btoint(bdata[1:5])}' #懒得去解析具体的值了
elif bdata[:1] == b'\x0E':
msg += f'PING '
elif bdata[:1] == b'\x01':
msg += f'QUIT '
elif lbdata > 32 and len(set(bdata[9:32])) == 1:
username = bdata[32:32+bdata[32:].find(b'\x00')].decode()
client_flag = btoint(bdata[:4])
msg += f"CONNECT username:{username} client_flag:{client_flag}"
else:
PPACK = False
if PPACK:
printf(msg)
sniff(filter=f'dst port 3308', iface='ens32', prn=save_pack)
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。