前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >[MYSQL] 主从连接协议(2)--GTID

[MYSQL] 主从连接协议(2)--GTID

原创
作者头像
大大刺猬
发布2024-05-08 18:11:38
1050
发布2024-05-08 18:11:38
举报
文章被收录于专栏:大大刺猬大大刺猬

导读

之前有讲MYSQL连接协议, 也有讲过主从连接协议. 并附有相关python测试代码. 但对于主从连接的时候, GTID获取还是借用的现有的, 也就是没有做解析. 在我们解析了binlog之后. gtid信息就不在话下了. 格式就是PRE_GTID, 我这里就不再介绍了. 有兴趣的自己去看 https://www.modb.pro/db/1781217154309378048

GTID decode&encode

每次重写之前的功能, 都想着写得简单方便实用点. 于是就是有了gtid += struct.pack('<QQ',*[int(_x) for _x in y.split("-")]) 这种看起来不友好, 但又简单实用的写法了. 尽是些花里胡哨的 -_-

代码语言:python
复制
class GTID(object):
	def __init__(self,data):
		if isinstance(data,bytes):
			self.bdata = data 
		else:
			self.data = data
		self.offset = 0
	def read(self,n):
		data = self.bdata[self.offset:self.offset+n]
		self.offset += n
		return data
	def encode(self,tdata=None):
		data = self.data if tdata is None else  tdata
		data = data.replace("\n","")
		gtid_number = 0
		ldata = data.rstrip(",").split(",")
		gtid = struct.pack("<Q",len(ldata))
		for x in ldata:
			gtid_number += 0
			_data = x.split(":")
			uid = binascii.unhexlify(_data[0].replace('-', ''))
			gtid += uid + struct.pack('<Q',len(_data[1:]))
			for y in _data[1:]:
				gtid += struct.pack('<QQ',*[int(_x) for _x in y.split("-")])
		return gtid
	def decode(self,):
		# @https://github.com/ddcw/pymysqlbinlog : pymysqlbinlog/gtid_event.py
		self.offset = 0
		gtid_number = struct.unpack('<Q',self.read(8))[0]
		gtid_list = []
		for x in range(gtid_number):
			server_uid_bdata = self.read(16)
			if server_uid_bdata == b'':
				break
			sid = str(uuid.UUID(bytes=server_uid_bdata))
			group_gno_number = struct.unpack('<Q',self.read(8))[0]
			gtid_info = sid
			for y in range(group_gno_number):
				start = struct.unpack('<Q',self.read(8))[0]
				stop  = struct.unpack('<Q',self.read(8))[0]
				gtid_info += f":{start}-{stop}"
			gtid_list.append(gtid_info)
		return ",".join([ str(x) for x in gtid_list ])

演示

基本上只是对上一次的脚本做补充, 用法没变, 我这里就直接演示了. 直接展示又好像少了点什么, 所以我就和 pymysqlbinlog 结合起来了, 只打印DML语句信息. (基本上只是多个导包而已).

代码语言:python
复制
import t20240508
aa = t20240508.repl()
aa.server_id = 123456
aa.auto_position = True
aa.connect()
aa.request_dump("6d650f1f-ba4e-11ed-99ab-000c2980c11e:1-5") # 请求相关GTID. 包含结束的那个GTID值.(就是server端有个-1操作, 解析binlog的时候代码里面也有写 -1)
aa.parse_event()

附源码

我这里就只给t20240508.py的源码(大部分没变), 其它的之前都发过.

代码语言:python
复制
import testpymysql
import struct
import uuid
import binascii
from pymysqlbinlog.pymysqlbinlog import mysqlbinlog
from pymysqlbinlog.row_event import *

def _read_lenenc(bdata,i): 
	length = btoint(bdata[i:i+1])
	i += 1
	data = bdata[i:i+length]
	i += length
	return data,i

def event_header(bdata):
	timestamp, event_type, server_id, event_size, log_pos, flags = struct.unpack("<LBLLLh",bdata[0:19])
	return {"timestamp":timestamp,'event_type':event_type,'server_id':server_id,'event_size':event_size,'log_pos':log_pos,'flags':flags,}

class GTID(object):
	def __init__(self,data):
		if isinstance(data,bytes):
			self.bdata = data 
		else:
			self.data = data
		self.offset = 0
	def read(self,n):
		data = self.bdata[self.offset:self.offset+n]
		self.offset += n
		return data
	def encode(self,tdata=None):
		data = self.data if tdata is None else  tdata
		data = data.replace("\n","")
		gtid_number = 0
		ldata = data.rstrip(",").split(",")
		gtid = struct.pack("<Q",len(ldata))
		for x in ldata:
			gtid_number += 0
			_data = x.split(":")
			uid = binascii.unhexlify(_data[0].replace('-', ''))
			gtid += uid + struct.pack('<Q',len(_data[1:]))
			for y in _data[1:]:
				gtid += struct.pack('<QQ',*[int(_x) for _x in y.split("-")])
		return gtid
	def decode(self,):
		# @https://github.com/ddcw/pymysqlbinlog : pymysqlbinlog/gtid_event.py
		self.offset = 0
		gtid_number = struct.unpack('<Q',self.read(8))[0]
		gtid_list = []
		for x in range(gtid_number):
			server_uid_bdata = self.read(16)
			if server_uid_bdata == b'':
				break
			sid = str(uuid.UUID(bytes=server_uid_bdata))
			group_gno_number = struct.unpack('<Q',self.read(8))[0]
			gtid_info = sid
			for y in range(group_gno_number):
				start = struct.unpack('<Q',self.read(8))[0]
				stop  = struct.unpack('<Q',self.read(8))[0]
				gtid_info += f":{start}-{stop}"
			gtid_list.append(gtid_info)
		return ",".join([ str(x) for x in gtid_list ])
				


class repl(testpymysql.mysql,mysqlbinlog):
	def __init__(self,*args,**kwargs):
		#super().__init__(*args,**kwargs)
		self.filename = "None"
		testpymysql.mysql.__init__(self,)
		mysqlbinlog.__init__(self,filename="xx")
		self.server_id = 12345678 #uint32
		self.lport = 6666
		self.auto_position = False #Ture:gtid
		self.master_heartbeat_period = 30*1000000000 #设置心跳, 单位纳秒
		self.log_file = 'm3308.001037' #其实可以show master status的, 但是我懒得去查了...
		self.log_pos = 4
		#懒得去计算gtid了, 直接用我的环境的现成的,  计算方式可以参考: pymysqlreplication 的 gtid.py
		#使用: gtid.GtidSet(GTID_STR).encode()
		self.ROLLBACK = False
		self.checksum = None

	#sql/rpl_slave.cc  register_slave_on_master
	def register_slave(self):
		"""
		COM_FLAG: 1 (COM_REGISTER_SLAVE:21)
		server_id: 4
		host,user,password 0
		port: 2
		rpl_recovery_rank:4 (0)
		master_id:4 (0)
		"""
		bdata = struct.pack('<BLBBBHLL',21,self.server_id,0,0,0,self.lport,0,0)
		self._next_seq_id = 0 #每个com都清零
		self.write_pack(bdata)
		rpack = self.read_pack()
		return True if rpack[0:1] == b'\x00' else False

	#sql/rpl_slave.cc request_dump
	def request_dump(self,gtid):
		self.query(f'SET @master_heartbeat_period = {self.master_heartbeat_period}')
		#master_binlog_checksum, slave_uuid 算了
		bdata = b''
		if self.auto_position:
			"""
			COM_FLAG: 1 (COM_BINLOG_DUMP_GTID:30)
			binlog_flags: 2
			server_id: 4
			BINLOG_NAME_INFO_SIZE: 4
			BINLOG_NAME:
			POS: 8
			gtid_size: 4
			gtid:
			"""
			#regiest first
			aa = GTID(gtid)
			self.bgtid = aa.encode()
			bdata = struct.pack('<BHLLLQL',30,0,self.server_id,4,0,self.log_pos,len(self.bgtid)) + self.bgtid
			self.register_slave()
		else:
			"""
			COM_FLAG: 1 (COM_BINLOG_DUMP:18)
			binlog-pos: 4
			flags: 2
			server_id: 4
			binlog-filename:
			"""
			self.register_slave()
			bdata = struct.pack('<BLHL',18,self.log_pos,0,self.server_id) + self.log_file.encode()

		self._next_seq_id = 0
		self.write_pack(bdata)

	def read_event(self,):
		bdata = self.read_pack()
		if bdata[0:1] == b'\x00' and len(bdata) > 20:
			return bdata[1:]

	def event(self):
		while True:
			bdata = self.read_pack()
			if bdata[0:1] == b'\x00' and len(bdata) > 20:
				yield event_header(bdata[1:20])

	def parse_event(self):
		while True:
			bdata = self.read_pack()
			if bdata[0:1] == b'\x00' and len(bdata) > 20:
				bdata = bdata[1:]
				event_type =  struct.unpack('>B',bdata[4:5])[0]
				if event_type == 19:
					aa  = tablemap_event(bdata=bdata[19:],event_header=bdata[:19])
					aa.init()
					bdata = self.read_pack()[1:]
					bb = row_event(bdata=bdata[19:],tablemap=aa,event_header=bdata[:19])
					bb.init()
					print(bb.read_sql())

	def query(self,sql):
		self._next_seq_id = 0
		bdata = struct.pack('<B',3) + sql.encode()
		self.write_pack(bdata)
		#return self.result()

	def close(self):
		#self._next_seq_id = 0
		self.write_pack(struct.pack('<B',1))

其它

写本文的时候发生的事情, 记录一下.

之前环境迁移, 今天验证的时候,发现个别文件系统的部分文件目录丢失了, 在该文件系统的lost+found目录找到了相关文件和目录, 但未发现删除之类的信息, 虽然恢复起来很麻烦. 最终还是恢复成功了. 迁移前一定要做好备份

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 导读
  • GTID decode&encode
  • 演示
  • 附源码
  • 其它
相关产品与服务
云数据库 MySQL
腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com