首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

mongodb 源码实现系列 - command命令处理模块源码实现一

关于作者

前滴滴出行技术专家,现任OPPO文档数据库mongodb负责人,负责oppo千万级峰值TPS/十万亿级数据量文档数据库mongodb内核研发及运维工作,一直专注于分布式缓存、高性能服务端、数据库、中间件等相关研发。后续持续分享《MongoDB内核源码设计、性能优化、最佳运维实践》,Github账号地址:https://github.com/y123456yz

1.?背景

???? <<transport_layer网络传输层模块源码实现>>中分享了mongodb内核底层网络IO处理相关实现,包括套接字初始化、一个完整mongodb报文的读取、获取到DB数据发送给客户端等。Mongodb支持多种增、删、改、查、聚合处理、cluster处理等操作,每个操作在内核实现中对应一个command,每个command有不同的功能,mongodb内核如何进行command源码处理将是本文分析的重点

此外,mongodb提供了mongostat工具来监控当前集群的各种操作统计。Mongostat监控统计如下图所示:

其中,insert、delete、update、query这四项统计比较好理解,分别对应增、删、改、查。但是,comand、getmore不是很好理解,command代表什么统计?getMore代表什么统计?,这两项相对比较难理解。

此外,通过本文字分析,我们将搞明白这六项统计的具体含义,同时弄清这六项统计由那些操作进行计数。

Command命令处理模块分为:mongos操作命令、mongod操作命令、mongodb集群内部命令,具体定义如下:

① mongos操作命令,客户端可以通过mongos访问集群相关的命令。

② mongod操作命令:客户端可以通过mongod复制集和cfg server访问集群的相关命令。

③ mongodb集群内部命令:mongos、mongod、mongo-cfg集群实例之间交互的命令。

???? Command命令处理模块核心代码实现如下:

???? 《command命令处理模块源码实现》相关文章重点分析命令处理模块核心代码实现,也就是上面截图中的命令处理源码文件实现。

2.?<<transport_layer网络传输层模块源码实现>>衔接回顾

<<transport_layer网络传输层模块源码实现三>>一文中,我们对service_state_machine状态机调度子模块进行了分析,该模块中的dealTask任务进行mongodb内部业务逻辑处理,其核心实现如下:

代码语言:javascript
复制

1.//dealTask处理??
2.void?ServiceStateMachine::_processMessage(ThreadGuard?guard)?{??
3.    ......
4.????//command处理、DB访问后的数据通过dbresponse返回??
5.????DbResponse?dbresponse?=?_sep->handleRequest(opCtx.get(),?_inMessage);??
6.    ......
}

上面的sep对应mongod或者mongos实例的服务入口实现,该seq成员分别在如下代码中初始化为ServiceEntryPointMongod和ServiceEntryPointMongod类实现。SSM状态机的_seq成员初始化赋值核心代码实现如下:

代码语言:javascript
复制

1.//mongos实例启动初始化??
2.static?ExitCode?runMongosServer()?{??
3.????......??
4.????//mongos实例对应sep为ServiceEntryPointMongos??
5.????auto?sep?=?stdx::make_unique<ServiceEntryPointMongos>(getGlobalServiceContext());??
6.????getGlobalServiceContext()->setServiceEntryPoint(std::move(sep));??
7.????......??
8.}??
9.??
10.//mongod实例启动初始化??
11.ExitCode?_initAndListen(int?listenPort)?{??
12.????......??
13.????//mongod实例对应sep为ServiceEntryPointMongod??
14.????serviceContext->setServiceEntryPoint(??
15.????????stdx::make_unique<ServiceEntryPointMongod>(serviceContext));??
16.????......??
17.}??
18.??
19.//SSM状态机初始化??
20.ServiceStateMachine::ServiceStateMachine(...)??
21.????:?_state{State::Created},??
22.??????//mongod和mongos实例的服务入口通过这里赋值给_seq成员变量??
23.??????_sep{svcContext->getServiceEntryPoint()},??
24.??????......??
25.}?

通过上面的几个核心接口把mongos和mongod实例的服务入口与状态机SSM(ServiceStateMachine)联系起来,最终和下面的command命令处理模块关联。

? dealTask进行一次mongodb请求的内部逻辑处理,该处理由_sep->handleRequest()接口实现。由于mongos和mongod服务入口分别由ServiceEntryPointMongos和ServiceEntryPointMongod两个类实现,因此dealTask也就演变为如下接口处理:

① mongos实例:ServiceEntryPointMongos::handleRequest(...)

② Mongod实例::ServiceEntryPointMongod::handleRequest(...)

这两个接口入参都是OperationContext和Message,分别对应操作上下文、请求原始数据内容。下文会分析Message解析实现、OperationContext服务上下文实现将在后续章节分析。

Mongod和mongos实例服务入口类都继承自网络传输模块中的ServiceEntryPointImpl类,如下图所示:

Tips: mongos和mongod服务入口类为何要继承网络传输模块服务入口类?

原因是一个请求对应一个链接session,该session对应的请求又和SSM状态机唯一对应。所有客户端请求对应的SSM状态机信息全部保存再ServiceEntryPointImpl._sessions成员中,而command命令处理模块为SSM状态机任务中的dealTask任务,通过该继承关系,ServiceEntryPointMongod和ServiceEntryPointMongos子类也就可以和状态机及任务处理关联起来,同时也可以获取当前请求对应的session链接信息。

3.?Mongodb协议解析

在《transport_layer网络传输层模块源码实现二》中的数据收发子模块完成了一个完整mongodb报文的接收,一个mongodb报文由Header头部+opCode包体组成,如下图所示:

上图中各个字段说明如下表:

opCode取值比较多,早期版本中OPINSERT、OPDELETE、OPUPDATE、OPQUERY分别针对增删改查请求,Mongodb从3.6版本开始默认使用OPMSG操作作为默认opCode,是一种可扩展的消息格式,旨在包含其他操作码的功能,新版本读写请求协议都对应该操作码。本文以OPMSG操作码对应协议为例进行分析,其他操作码协议分析过程类似,OP_MSG请求协议格式如下:

代码语言:javascript
复制

1.OP_MSG?{??
2.????//mongodb报文头部??
3.????MsgHeader?header;????????????
4.????//位图,用于标识报文是否需要校验?是否需要应答等??
5.????uint32?flagBits;???????????//?message?flags??
6.????//报文内容,例如find?write等命令内容通过bson格式存在于该结构中??
7.????Sections[]?sections;???????//?data?sections??
8.????//报文CRC校验??
9.????optional<uint32>?checksum;?//?optional?CRC-32C?checksum??
}??

OP_MSG各个字段说明如下表:

一个完整OP_MSG请求格式如下:

除了通用头部header外,客户端命令请求实际上都保存于sections字段中,该字段存放的是请求的原始bson格式数据。BSON是由10gen开发的一个数据格式,目前主要用于MongoDB中,是MongoDB的数据存储格式。BSON基于JSON格式,选择JSON进行改造的原因主要是JSON的通用性及JSON的schemaless的特性。BSON相比JSON具有以下特性:

① Lightweight(更轻量级)

② Traversable(易操作)

③ Efficient(高效性能)

本文重点不是分析bson协议格式,bson协议实现细节将在后续章节分享。bson协议更多设计细节详见:http://bsonspec.org/

总结:一个完整mongodb报文由header+body组成,其中header长度固定为16字节,body长度等于messageLength-16。Header部分协议解析由message.cpp和message.h两源码文件实现,body部分对应的OP_MSG类请求解析由op_msg.cpp和op_msg.h两源码文件实现。

4. mongodb报文通用头部解析及封装源码实现

Header头部解析由src/mongo/util/net目录下message.cpp和message.h两文件完成,该类主要完成通用header头部和body部分的解析、封装。因此报文头部核心代码分为以下两类:

① 报文头部内容解析及封装(MSGHEADER命名空间实现)

② 头部和body内容解析及封装(MsgData命名空间实现)

4.1 mongodb报文头部解析及封装核心代码实现

mongodb报文头部解析由namespace MSGHEADER {...}实现,该类主要成员及接口实现如下:

代码语言:javascript
复制

1.namespace?MSGHEADER?{??
2.//header头部各个字段信息??
3.struct?Layout?{??
4.????//整个message长度,包括header长度和body长度??
5.????int32_t?messageLength;?????
6.????//requestID?该请求id信息??
7.????int32_t?requestID;?????????
8.????//getResponseToMsgId解析??
9.????int32_t?responseTo;????????
10.????//操作类型:OP_UPDATE、OP_INSERT、OP_QUERY、OP_DELETE、OP_MSG等??
11.????int32_t?opCode;??
12.};??
13.??
14.//ConstView实现header头部数据解析??
15.class?ConstView?{???
16.public:??
17.????......??
18.????//初始化构造??
19.????ConstView(const?char*?data)?:?_data(data)?{}??
20.????//获取_data地址??
21.????const?char*?view2ptr()?const?{??
22.????????return?data().view();??
23.????}??
24.????//TransportLayerASIO::ASIOSourceTicket::_headerCallback调用??
25.????//解析header头部的messageLength字段??
26.????int32_t?getMessageLength()?const?{??
27.????????return?data().read<LittleEndian<int32_t>>(offsetof(Layout,?messageLength));??
28.????}??
29.????//解析header头部的requestID字段??
30.????int32_t?getRequestMsgId()?const?{??
31.????????return?data().read<LittleEndian<int32_t>>(offsetof(Layout,?requestID));??
32.????}??
33.????//解析header头部的getResponseToMsgId字段??
34.????int32_t?getResponseToMsgId()?const?{??
35.????????return?data().read<LittleEndian<int32_t>>(offsetof(Layout,?responseTo));??
36.????}??
37.????//解析header头部的opCode字段??
38.????int32_t?getOpCode()?const?{??
39.????????return?data().read<LittleEndian<int32_t>>(offsetof(Layout,?opCode));??
40.????}??
41.??
42.protected:??
43.????//mongodb报文数据起始地址??
44.????const?view_type&?data()?const?{??
45.????????return?_data;??
46.????}??
47.private:??
48.????//数据部分??
49.????view_type?_data;??
50.};??
51.??
52.//View填充header头部数据??
53.class?View?:?public?ConstView?{??
54.public:??
55.????......??
56.????//构造初始化??
57.????View(char*?data)?:?ConstView(data)?{}??
58.????//header起始地址??
59.????char*?view2ptr()?{??
60.????????return?data().view();??
61.????}??
62.????//以下四个接口进行header填充??
63.????//填充header头部messageLength字段??
64.????void?setMessageLength(int32_t?value)?{??
65.????????data().write(tagLittleEndian(value),?offsetof(Layout,?messageLength));??
66.????}??
67.????//填充header头部requestID字段??
68.????void?setRequestMsgId(int32_t?value)?{??
69.????????data().write(tagLittleEndian(value),?offsetof(Layout,?requestID));??
70.????}??
71.????//填充header头部responseTo字段??
72.????void?setResponseToMsgId(int32_t?value)?{??
73.????????data().write(tagLittleEndian(value),?offsetof(Layout,?responseTo));??
74.????}??
75.????//填充header头部opCode字段??
76.????void?setOpCode(int32_t?value)?{??
77.????????data().write(tagLittleEndian(value),?offsetof(Layout,?opCode));??
78.????}??
79.private:??
80.????//指向header起始地址??
81.????view_type?data()?const?{??
82.????????return?const_cast<char*>(ConstView::view2ptr());??
83.????}??
84.};??
85.}

从上面的header头部解析、填充的实现类可以看出,header头部解析由MSGHEADER::ConstView实现;header头部填充由MSGHEADER::View完成。实际上代码实现上,通过offsetof来进行移位,从而快速定位到头部对应字段。

4.2 mongodb报文头部+body解析封装核心代码实现

Namespace MSGHEADER{...}命名空间只负责header头部的处理,namespace MsgData{...}命名空间相对MSGHEADER命名空间更加完善,除了处理头部解析封装外,还负责body数据起始地址维护、body数据封装、数据长度检查等。MsgData命名空间核心代码实现如下:

代码语言:javascript
复制

1.namespace?MsgData?{??
2.struct?Layout?{??
3.????//数据填充组成:header部分??
4.????MSGHEADER::Layout?header;??
5.????//数据填充组成:?body部分,body先用data占位置??
6.????char?data[4];??
7.};??
8.??
9.//解析header字段信息及body其实地址信息??
10.class?ConstView?{??
11.public:??
12.????//初始化构造??
13.????ConstView(const?char*?storage)?:?_storage(storage)?{}??
14.????//获取数据起始地址??
15.????const?char*?view2ptr()?const?{??
16.????????return?storage().view();??
17.????}??
18.??
19.????//以下四个接口间接执行前面的MSGHEADER中的头部字段解析??
20.????//填充header头部messageLength字段??
21.????int32_t?getLen()?const?{??
22.????????return?header().getMessageLength();??
23.????}??
24.????//填充header头部requestID字段??
25.????int32_t?getId()?const?{??
26.????????return?header().getRequestMsgId();??
27.????}??
28.????//填充header头部responseTo字段??
29.????int32_t?getResponseToMsgId()?const?{??
30.????????return?header().getResponseToMsgId();??
31.????}??
32.????//获取网络数据报文中的opCode字段??
33.????NetworkOp?getNetworkOp()?const?{??
34.????????return?NetworkOp(header().getOpCode());??
35.????}??
36.????//指向body起始地址??
37.????const?char*?data()?const?{??
38.????????return?storage().view(offsetof(Layout,?data));??
39.????}??
40.????//messageLength长度检查,opcode检查??
41.????bool?valid()?const?{??
42.????????if?(getLen()?<=?0?||?getLen()?>?(4?*?BSONObjMaxInternalSize))??
43.????????????return?false;??
44.????????if?(getNetworkOp()?<?0?||?getNetworkOp()?>?30000)??
45.????????????return?false;??
46.????????return?true;??
47.????}??
48.????......??
49.protected:??
50.????//获取_storage??
51.????const?ConstDataView&?storage()?const?{??
52.????????return?_storage;??
53.????}??
54.????//指向header起始地址??
55.????MSGHEADER::ConstView?header()?const?{??
56.????????return?storage().view(offsetof(Layout,?header));??
57.????}??
58.private:??
59.????//mongodb报文存储在这里??
60.????ConstDataView?_storage;??
61.};??
62.??
63.//填充数据,包括Header和body??
64.class?View?:?public?ConstView?{??
65.public:??
66.????//构造初始化??
67.????View(char*?storage)?:?ConstView(storage)?{}??
68.????......??
69.????//获取报文起始地址??
70.????char*?view2ptr()?{??
71.????????return?storage().view();??
72.????}??
73.??
74.????//以下四个接口间接执行前面的MSGHEADER中的头部字段构造??
75.????//以下四个接口完成msg?header赋值??
76.????//填充header头部messageLength字段??
77.????void?setLen(int?value)?{??
78.????????return?header().setMessageLength(value);??
79.????}??
80.????//填充header头部messageLength字段??
81.????void?setId(int32_t?value)?{??
82.????????return?header().setRequestMsgId(value);??
83.????}??
84.????//填充header头部messageLength字段??
85.????void?setResponseToMsgId(int32_t?value)?{??
86.????????return?header().setResponseToMsgId(value);??
87.????}??
88.????//填充header头部messageLength字段??
89.????void?setOperation(int?value)?{??
90.????????return?header().setOpCode(value);??
91.????}??
92.??
93.????using?ConstView::data;??
94.????//指向data??
95.????char*?data()?{??
96.????????return?storage().view(offsetof(Layout,?data));??
97.????}??
98.private:??
99.????//也就是报文起始地址??
100.????DataView?storage()?const?{??
101.????????return?const_cast<char*>(ConstView::view2ptr());??
102.????}??
103.????//指向header头部??
104.????MSGHEADER::View?header()?const?{??
105.????????return?storage().view(offsetof(Layout,?header));??
106.????}??
107.};??
108.??
109.......??
110.//Value为前面的Layout,减4是因为有4字节填充data,所以这个就是header长度??
111.const?int?MsgDataHeaderSize?=?sizeof(Value)?-?4;??
112.??
113.//除去头部后的数据部分长度??
114.inline?int?ConstView::dataLen()?const?{???
115.????return?getLen()?-?MsgDataHeaderSize;??
116.}??
117.}??//?namespace?MsgData??

???? 和MSGHEADER命名空间相比,MsgData这个namespace命名空间接口实现和前面的MSGHEADER命名空间实现大同小异。MsgData不仅仅处理header头部的解析组装,还负责body部分数据头部指针指向、头部长度检查、opCode检查、数据填充等。其中,MsgData命名空间中header头部的解析构造底层依赖MSGHEADER实现。

4.3 Message/DbMessage核心代码实现

在《transport_layer网络传输层模块源码实现二》中,从底层ASIO库接收到的mongodb报文是存放在Message结构中存储,最终存放在ServiceStateMachine._inMessage成员中。

在前面第2章我们知道mongod和mongso实例的服务入口接口handleRequest(...)中都带有Message入参,也就是接收到的Message数据通过该接口处理。Message类主要接口实现如下:

代码语言:javascript
复制

1.//DbMessage._msg成员为该类型??
2.class?Message?{??
3.public:??
4.????//message初始化??
5.????explicit?Message(SharedBuffer?data)?:?_buf(std::move(data))?{}??
6.????//头部header数据??
7.????MsgData::View?header()?const?{??
8.????????verify(!empty());??
9.????????return?_buf.get();??
10.????}??
11.????//获取网络数据报文中的op字段??
12.????NetworkOp?operation()?const?{??
13.????????return?header().getNetworkOp();??
14.????}??
15.????//_buf释放为空??
16.????bool?empty()?const?{??
17.????????return?!_buf;??
18.????}??
19.????//获取报文总长度messageLength??
20.????int?size()?const?{??
21.????????if?(_buf)?{??
22.????????????return?MsgData::ConstView(_buf.get()).getLen();??
23.????????}??
24.????????return?0;??
25.????}??
26.????//body长度??
27.????int?dataSize()?const?{??
28.????????return?size()?-?sizeof(MSGHEADER::Value);??
29.????}??
30.????//buf重置??
31.????void?reset()?{??
32.????????_buf?=?{};??
33.????}??
34.????//?use?to?set?first?buffer?if?empty??
35.????//_buf直接使用buf空间??
36.????void?setData(SharedBuffer?buf)?{??
37.????????verify(empty());??
38.????????_buf?=?std::move(buf);??
39.????}??
40.?????//把msgtxt拷贝到_buf中??
41.????void?setData(int?operation,?const?char*?msgtxt)?{??
42.????????setData(operation,?msgtxt,?strlen(msgtxt)?+?1);??
43.????}??
44.????//根据operation和msgdata构造一个完整mongodb报文??
45.????void?setData(int?operation,?const?char*?msgdata,?size_t?len)?{??
46.????????verify(empty());??
47.????????size_t?dataLen?=?len?+?sizeof(MsgData::Value)?-?4;??
48.????????_buf?=?SharedBuffer::allocate(dataLen);??
49.????????MsgData::View?d?=?_buf.get();??
50.????????if?(len)??
51.????????????memcpy(d.data(),?msgdata,?len);??
52.????????d.setLen(dataLen);??
53.????????d.setOperation(operation);??
54.????}??
55.????......??
56.????//获取_buf对应指针??
57.????const?char*?buf()?const?{??
58.????????return?_buf.get();??
59.????}??
60.??
61.private:??
62.????//存放接收数据的buf??
63.????SharedBuffer?_buf;??
64.};??                                                                                                               

Message是操作mongodb收发报文最直接的实现类,该类主要完成一个完整mongodb报文封装。有关mongodb报文头后面的body更多的解析实现在DbMessage类中完成,DbMessage类包含Message类成员msg。实际上,Message报文信息在handleRequest(...)实例服务入口中赋值给DbMessage.msg,报文后续的body处理继续由DbMessage类相关接口完成处理。DbMessage和Message类关系如下:

代码语言:javascript
复制

1.class?DbMessage?{??
2.????......??
3.????//包含Message成员变量??
4.????const?Message&?_msg;??
5.    //mongodb报文起始地址
6.    const char* _nsStart; 
7.    //报文结束地址
8.    const char* _theEnd; 
9.}??
10.??
11.DbMessage::DbMessage(const?Message&?msg)?:?_msg(msg),???
12.??_nsStart(NULL),?_mark(NULL),?_nsLen(0)?{??
13.????//一个mongodb报文(header+body)数据的结束地址??
14.????_theEnd?=?_msg.singleData().data()?+?_msg.singleData().dataLen();??
15.????//报文起始地址?[_nextjsobj,?_theEnd?]之间的数据就是一个完整mongodb报文??
16.????_nextjsobj?=?_msg.singleData().data();??
17.????......??
}

DbMessage._msg成员为DbMessage?类型,DbMessage的_nsStart和_theEnd成员分别记录完整mongodb报文的起始地址和结束地址,通过这两个指针就可以获取一个完整mongodb报文的全部内容,包括header和body。

注意:DbMessage是早期mongodb版本(version<3.6)中用于报文body解析封装的类,这些类针对opCode=[dbUpdate, dbDelete]这个区间的操作。在mongodb新版本(version>=3.6)中,body解析及封装由op_msg.h和op_msg.cpp代码文件中的clase OpMsgRequest{}完成处理。

4.4 OpMsg报文解析封装核心代码实现

???? ?Mongodb从3.6版本开始默认使用OP_MSG操作作为默认opCode,是一种可扩展的消息格式,旨在包含其他操作码的功能,新版本读写请求协议都对应该操作码。OP_MSG对应mongodb报文body解析封装处理由OpMsg类相关接口完成,OpMsg::parse(Message)从Message中解析出报文body内容,其核心代码实现如下:

代码语言:javascript
复制

1.struct?OpMsg?{???
2.??????......??
3.????//msg解析赋值见OpMsg::parse?????
4.????//各种命令(insert?update?find等)都存放在该body中??
5.????BSONObj?body;????
6.????//sequences用法暂时没看懂,感觉没什么用?先跳过??
7.????std::vector<DocumentSequence>?sequences;?//赋值见OpMsg::parse??
8.}??
代码语言:javascript
复制

1.//从message中解析出OpMsg信息??
2.OpMsg?OpMsg::parse(const?Message&?message)?try?{??
3.????//message不能为空,并且opCode必须为dbMsg??
4.????invariant(!message.empty());??
5.????invariant(message.operation()?==?dbMsg);??
6.????//获取flagBits??
7.????const?uint32_t?flags?=?OpMsg::flags(message);??
8.????//flagBits有效性检查,bit?0-15中只能对第0和第1位操作??
9.????uassert(ErrorCodes::IllegalOpMsgFlag,??
10.????????????str::stream()?<<?"Message?contains?illegal?flags?value:?Ob"??
11.??????????????????????????<<?std::bitset<32>(flags).to_string(),??
12.????????????!containsUnknownRequiredFlags(flags));??
13.??
14.????//校验码默认4字节??
15.????constexpr?int?kCrc32Size?=?4;??
16.????//判断该mongo报文body内容是否启用了校验功能??
17.????const?bool?haveChecksum?=?flags?&?kChecksumPresent;??
18.????//如果有启用校验功能,则报文末尾4字节为校验码??
19.????const?int?checksumSize?=?haveChecksum???kCrc32Size?:?0;??
20.????//sections字段内容??
21.????BufReader?sectionsBuf(message.singleData().data()?+?sizeof(flags),??
22.??????????????????????????message.dataSize()?-?sizeof(flags)?-?checksumSize);??
23.??
24.????//默认先设置位false??
25.????bool?haveBody?=?false;??
26.????OpMsg?msg;??
27.????//解析sections对应命令请求数据??
28.????while?(!sectionsBuf.atEof())?{??
29.????    //BufReader::read读取kind内容,一个字节??
30.????????const?auto?sectionKind?=?sectionsBuf.read<Section>();??
31.????    //kind为0对应命令请求body内容,内容通过bson报错??
32.????????switch?(sectionKind)?{??
33.????????    //sections第一个字节是0说明是body??
34.????????????case?Section::kBody:?{??
35.????????????????//默认只能有一个body??
36.????????????????uassert(40430,?"Multiple?body?sections?in?message",?!haveBody);??
37.????????????????haveBody?=?true;??
38.????????        //命令请求的bson信息保存在这里??
39.????????????????msg.body?=?sectionsBuf.read<Validated<BSONObj>>();??
40.????????????????break;??
41.????????????}??
42.??
43.????????    //DocSequence暂时没看明白,用到的地方很少,跳过,后续等??
44.????????????//该系列文章主流功能分析完成后,从头再回首分析??
45.????????????case?Section::kDocSequence:?{??
46.??????????????????......??
47.????????????}??
48.????????}??
49.????}??
50.????//OP_MSG必须有body内容??
51.????uassert(40587,?"OP_MSG?messages?must?have?a?body",?haveBody);??
52.????//body和sequence去重判断??
53.????for?(const?auto&?docSeq?:?msg.sequences)?{??
54.????????......??
55.????}??
56.????return?msg;??
}??

OpMsg类被OpMsgRequest类继承,OpMsgRequest类中核心接口就是解析出OpMsg.body中的库信息和表信息,OpMsgRequest类代码实现如下:

代码语言:javascript
复制

1.//协议解析得时候会用到,见runCommands??
2.struct?OpMsgRequest?:?public?OpMsg?{??
3.????......??
4.????//构造初始化??
5.????explicit?OpMsgRequest(OpMsg&&?generic)?:?OpMsg(std::move(generic))?{}??
6.????//opMsgRequestFromAnyProtocol->OpMsgRequest::parse???
7.????//从message中解析出OpMsg所需成员信息??
8.????static?OpMsgRequest?parse(const?Message&?message)?{??
9.????????//OpMsg::parse??
10.????????return?OpMsgRequest(OpMsg::parse(message));??
11.????}??
12.????//根据db?body?extraFields填充OpMsgRequest??
13.????static?OpMsgRequest?fromDBAndBody(...?{??
14.????????OpMsgRequest?request;??
15.????????request.body?=?([&]?{??
16.????????????//填充request.body??
17.????????????......??
18.????????}());??
19.????????return?request;??
20.????}??
21.????//从body中获取db?name??
22.????StringData?getDatabase()?const?{??
23.????????if?(auto?elem?=?body["$db"])??
24.????????????return?elem.checkAndGetStringData();??
25.????????uasserted(40571,?"OP_MSG?requests?require?a?$db?argument");??
26.????}??
27.????//find??insert?等命令信息??body中的第一个elem就是command?名??
28.????StringData?getCommandName()?const?{??
29.????????return?body.firstElementFieldName();??
30.????}??
};?

OpMsgRequest通过OpMsg::parse(message)解析出OpMsg信息,从而获取到body内容,GetCommandName()接口和getDatabase()则分别从body中获取库DB信息、命令名信息。通过该类相关接口,命令名(find、write、update等)和DB库都获取到了。

OpMsg模块除了OPMSG相关报文解析外,还负责OPMSG报文组装填充,该模块接口功能大全如下表:

5. Mongod实例服务入口核心代码实现

Mongod实例服务入口类ServiceEntryPointMongod继承ServiceEntryPointImpl类,mongod实例的报文解析处理、命令解析、命令执行都由该类负责处理。ServiceEntryPointMongod核心接口可以细分为:opCode解析及回调处理、命令解析及查找、命令执行三个子模块。

5.1 opCode解析及回调处理

???? OpCode操作码解析及其回调处理由ServiceEntryPointMongod::handleRequest(...)接口实现,核心代码实现如下:

代码语言:javascript
复制

1.//mongod服务对于客户端请求的处理????
2.//通过状态机SSM模块的如下接口调用:ServiceStateMachine::_processMessage??
3.DbResponse?ServiceEntryPointMongod::handleRequest(OperationContext*?opCtx,?const?Message&?m)?{??
4.????//获取opCode,3.6版本对应客户端默认使用OP_MSG??
5.????NetworkOp?op?=?m.operation();???
6.????......??
7.????//根据message构造DbMessage??
8.????DbMessage?dbmsg(m);??
9.????//根据操作上下文获取对应的client??
10.????Client&?c?=?*opCtx->getClient();????
11.????......??
12.????//获取库.表信息,注意只有dbUpdate<opCode<dbDelete的opCode请求才通过dbmsg直接获取库和表信息??
13.????const?char*?ns?=?dbmsg.messageShouldHaveNs()???dbmsg.getns()?:?NULL;??
14.????const?NamespaceString?nsString?=?ns???NamespaceString(ns)?:?NamespaceString();??
15.????....??
16.????//CurOp::debug?初始化opDebug,慢日志相关记录??
17.????OpDebug&?debug?=?currentOp.debug();??
18.????//慢日志阀值??
19.????long?long?logThresholdMs?=?serverGlobalParams.slowMS;??
20.????//时mongodb将记录这次慢操作,1为只记录慢操作,即操作时间大于了设置的配置,2表示记录所有操作????
21.????bool?shouldLogOpDebug?=?shouldLog(logger::LogSeverity::Debug(1));??
22.????DbResponse?dbresponse;??
23.????if?(op?==?dbMsg?||?op?==?dbCommand?||?(op?==?dbQuery?&&?isCommand))?{??
24.????????//新版本op=dbMsg,因此走这里??
25.????????//从DB获取数据,获取到的数据通过dbresponse返回??
26.????????dbresponse?=?runCommands(opCtx,?m);?????
27.????}?else?if?(op?==?dbQuery)?{??
28.????????......???
29.????????//早期mongodb版本查询走这里??
30.????????dbresponse?=?receivedQuery(opCtx,?nsString,?c,?m);??
31.????}?else?if?(op?==?dbGetMore)?{????
32.????????//早期mongodb版本查询走这里??
33.????????dbresponse?=?receivedGetMore(opCtx,?m,?currentOp,?&shouldLogOpDebug);??
34.????}?else?{??
35.????????......??
36.????????//早期版本增?删?改走这里处理??
37.?????????if?(op?==?dbInsert)?{??
38.??????????????receivedInsert(opCtx,?nsString,?m);?//插入操作入口???新版本CmdInsert::runImpl??
39.?????????}?else?if?(op?==?dbUpdate)?{??
40.??????????????receivedUpdate(opCtx,?nsString,?m);?//更新操作入口????
41.?????????}?else?if?(op?==?dbDelete)?{??
42.??????????????receivedDelete(opCtx,?nsString,?m);?//删除操作入口????
43.?????????}???
44.????}??
45.????//获取runCommands执行时间,也就是内部处理时间??
46.????debug.executionTimeMicros?=?durationCount<Microseconds>(currentOp.elapsedTimeExcludingPauses());??
47.????......??
48.????//慢日志记录??
49.????if?(shouldLogOpDebug?||?(shouldSample?&&?debug.executionTimeMicros?>?logThresholdMs?*?1000LL))?{??
50.????????Locker::LockerInfo?lockerInfo;????
51.????????//OperationContext::lockState??LockerImpl<>::getLockerInfo??
52.????????opCtx->lockState()->getLockerInfo(&lockerInfo);???
53.??
54.????//OpDebug::report?记录慢日志到日志文件??
55.????????log()?<<?debug.report(&c,?currentOp,?lockerInfo.stats);???
56.????}??
57.????//各种统计信息??
58.????recordCurOpMetrics(opCtx);??
59.}??

Mongod的handleRequest()接口主要完成以下工作:

① 从Message中获取OpCode,早期版本每个命令又对应取值,例如增删改查早期版本分别对应:dbInsert、dbDelete、dbUpdate、dbQuery;Mongodb 3.6开始,默认请求对应OpCode都是OP_MSG,本文默认只分析OpCode=OP_MSG相关的处理。

② 获取本操作对应的Client客户端信息。

③ 如果是早期版本,通过Message构造DbMessage,同时解析出库.表信息。

④ 根据不同OpCode执行对应回调操作,OP_MSG对应操作为runCommands(...),获取的数据通过dbresponse返回。

⑤ 获取到db层返回的数据后,进行慢日志判断,如果db层数据访问超过阀值,记录慢日志。

⑥ 设置debug的各种统计信息。

5.2命令解析及查找

从上面的分析可以看出,接口最后调用runCommands(...),该接口核心代码实现如下所示:

代码语言:javascript
复制

1.//message解析出对应command执行??
2.DbResponse?runCommands(OperationContext*?opCtx,?const?Message&?message)?{??
3.????//获取message对应的ReplyBuilder,3.6默认对应OpMsgReplyBuilder??
4.????//应答数据通过该类构造??
5.????auto?replyBuilder?=?rpc::makeReplyBuilder(rpc::protocolForMessage(message));??
6.????[&]?{??
7.????????OpMsgRequest?request;??
8.????????try?{??//?Parse.??
9.????????????//协议解析?根据message获取对应OpMsgRequest??
10.????????????request?=?rpc::opMsgRequestFromAnyProtocol(message);??
11.????????}???
12.????}??
13.????try?{??//?Execute.??
14.????????//opCtx初始化??
15.????????curOpCommandSetup(opCtx,?request);??
16.????????//command初始化为Null??
17.????????Command*?c?=?nullptr;??
18.????????//OpMsgRequest::getCommandName查找??
19.????????if?(!(c?=?Command::findCommand(request.getCommandName())))?{???
20.?????????????//没有找到相应的command的后续异常处理??
21.?????????????......??
22.????????}??
23.????????//执行command命令,获取到的数据通过replyBuilder.get()返回??
24.????????execCommandDatabase(opCtx,?c,?request,?replyBuilder.get());??
25.????}??
26.????//OpMsgReplyBuilder::done对数据进行序列化操作??
27.????auto?response?=?replyBuilder->done();??
28.????//responseLength赋值??
29.????CurOp::get(opCtx)->debug().responseLength?=?response.header().dataLen();??
30.????//?返回??
31.????return?DbResponse{std::move(response)};??
}?

RunCommands(...)接口从message中解析出OpMsg信息,然后获取该OpMsg对应的command命令信息,最后执行该命令对应的后续处理操作。主要功能说明如下:

① 获取该OpCode对应replyBuilder,OP_MSG操作对应builder为OpMsgReplyBuilder。

② 根据message解析出OpMsgRequest数据,OpMsgRequest来中包含了真正的命令请求bson信息。

③ opCtx初始化操作。

④ 通过request.getCommandName()返回命令信息(如“find”、“update”等字符串)。

⑤ 通过Command::findCommand(command name)从CommandMap这个map表中查找是否支持该 command命令。如果没找到说明不支持,如果找到说明支持。

⑥ 调用execCommandDatabase(...)执行该命令,并获取命令的执行结果。

⑦ 根据command执行结果构造response并返回

5.3命令执行

代码语言:javascript
复制

1.void?execCommandDatabase(...)?{??
2.????......??
3.????//获取dbname??
4.????const?auto?dbname?=?request.getDatabase().toString();??
5.????......??
6.????//mab表存放从bson中解析出的elem信息??
7.????StringMap<int>?topLevelFields;??
8.????//body?elem解析??
9.????for?(auto&&?element?:?request.body)?{??
10.????????//获取bson中的elem信息??
11.????????StringData?fieldName?=?element.fieldNameStringData();??
12.????????//如果elem信息重复,则异常处理??
13.????????......??
14.????}??
15.????//如果是help命令,则给出help提示??
16.????if?(Command::isHelpRequest(helpField))?{??
17.????????//给出help提示??
18.????????Command::generateHelpResponse(opCtx,?replyBuilder,?*command);??
19.????????return;??
20.????}??
21.????//权限认证检查,检查该命令执行权限??
22.????uassertStatusOK(Command::checkAuthorization(command,?opCtx,?request));??
23.????......??
24.??
25.????//该命令执行次数统计??db.serverStatus().metrics.commands可以获取统计信息??
26.????command->incrementCommandsExecuted();??
27.????//真正的命令执行在这里面??
28.????retval?=?runCommandImpl(opCtx,?command,?request,?replyBuilder,?startOperationTime);??
29.????//该命令执行失败次数统计??
30.????if?(!retval)?{??
31.????????command->incrementCommandsFailed();??
32.?????}??
33.?????......??
}

execCommandDatabase(...)最终调用RunCommandImpl(...)进行对应命令的真正处理,该接口核心代码实现如下:

代码语言:javascript
复制

1.bool?runCommandImpl(...)?{??
2.????//获取命令请求内容body??
3.????BSONObj?cmd?=?request.body;??
4.????//获取请求中的DB库信息??
5.????const?std::string?db?=?request.getDatabase().toString();??
6.????//ReadConcern检查??
7.????Status?rcStatus?=?waitForReadConcern(??
8.????????opCtx,?repl::ReadConcernArgs::get(opCtx),?command->allowsAfterClusterTime(cmd));??
9.????//ReadConcern检查不通过,直接异常提示处理??
10.????if?(!rcStatus.isOK())?{??
11.?????????//异常处理??
12.?????????return;??
13.????}??
14.????if?(!command->supportsWriteConcern(cmd))?{??
15.????????//命令不支持WriteConcern,但是对应的请求中却带有WriteConcern配置,直接报错不支持??
16.????????if?(commandSpecifiesWriteConcern(cmd))?{??
17.????????????//异常处理"Command?does?not?support?writeConcern"??
18.????????????......??
19.????????????return?result;??
20.????????}??
21.????//调用Command::publicRun执行不同命令操作??
22.????????result?=?command->publicRun(opCtx,?request,?inPlaceReplyBob);??
23.????}??
24.????//提取WriteConcernOptions信息??
25.????auto?wcResult?=?extractWriteConcern(opCtx,?cmd,?db);??
26.????//提取异常,直接异常处理??
27.????if?(!wcResult.isOK())?{??
28.????????//异常处理??
29.????????......??
30.????????return?result;??
31.????}??
32.????......??
33.????//执行对应的命令Command::publicRun,执行不同命令操作??
34.????result?=?command->publicRun(opCtx,?request,?inPlaceReplyBob);??
35.????......??
36.}

???? RunCommandImpl(...)接口最终调用该接口入参的command,执行?command->publicRun(...)接口,也就是命令模块的公共publicRun。

5.4总结

Mongod服务入口首先从message中解析出opCode操作码,3.6版本对应客户端默认操作码为OP_MSQ,解析出该操作对应OpMsgRequest信息。然后从message原始数据中解析出command命令字符串后,继续通过全局Map表种查找是否支持该命令操作,如果支持则执行该命令;如果不支持,直接异常打印,同时返回。

6. Mongos实例服务入口核心代码实现

????? ?mongos服务入口核心代码实现过程和mongod服务入口代码实现流程几乎相同,mongos实例message解析、OP_MSG操作码处理、command命令查找等流程和上一章节mongod实例处理过程类似,本章节不在详细分析。Mongos实例服务入口处理调用流程如下:

ServiceEntryPointMongos::handleRequest(...)->Strategy::clientCommand(...)-->runCommand(...)->execCommandClient(...)

最后的接口核心代码实现如下:

代码语言:javascript
复制

1.void?runCommand(...)?{??
2.????......??
3.????//获取请求命令name??
4.????auto?const?commandName?=?request.getCommandName();??
5.????//从全局map表中查找??
6.????auto?const?command?=?Command::findCommand(commandName);??
7.????//没有对应的command存在,抛异常说明不支持该命令??
8.????if?(!command)?{???
9.????????......??
10.????????return;??
11.????}???
12.????......??
13.????//执行命令??
14.????execCommandClient(opCtx,?command,?request,?builder);???
15.????......??
16.}??
17.
18.void?execCommandClient(...)??
19.{???
20.????......??
21.????//认证检查,是否有操作该command命令的权限,没有则异常提示??
22.????Status?status?=?Command::checkAuthorization(c,?opCtx,?request);????
23.????if?(!status.isOK())?{??
24.????????Command::appendCommandStatus(result,?status);??
25.????????return;??
26.????}??
27.????//该命令的执行次数自增,代理上面也是要计数的??
28.????c->incrementCommandsExecuted();???
29.????//如果需要command统计,则加1??
30.????if?(c->shouldAffectCommandCounter())?{??
31.????????globalOpCounters.gotCommand();??
32.????}??
33.????......??
34.????//有部分命令不支持writeconcern配置,报错??
35.????bool?supportsWriteConcern?=?c->supportsWriteConcern(request.body);??
36.????//不支持writeconcern又带有该参数的请求,直接异常处理"Command?does?not?support?writeConcern"??
37.????if?(!supportsWriteConcern?&&?!wcResult.getValue().usedDefault)?{??
38.????????......??
39.????????return;??
40.????}??
41.????//执行本命令对应的公共publicRun接口,Command::publicRun??
42.????ok?=?c->publicRun(opCtx,?request,?result);???
43.????......??
44.}??
  • Tips: mongos和mongod实例服务入口核心代码实现的一点小区别

① Mongod实例opCode操作码解析、OpMsg解析、command查找及对应命令调用处理都由class ServiceEntryPointMongod{...}类一起完成。

② mongos实例则把opCode操作码解析交由class ServiceEntryPointMongos{...}类实现,OpMsg解析、command查找及对应命令调用处理放到了clase Strategy{...}类来处理。

7.总结

?? Mongodb报文解析及组装流程总结

① 一个完整mongodb报文由通用报文header头部+body部分组成。

② Body部分内容,根据报文头部的opCode来决定不同的body内容。

③ 3.6版本对应客户端请求opCode默认为OP_MSG,该操作码对应body部分由flagBits + sections + checksum组成,其中sections中存放的是真正的命令请求信息,已bson数据格式保存。

④ Header头部和body报文体封装及解析过程由class Message {...}类实现

⑤ Body中对应command命令名、库名、表名的解析在mongodb(version<3.6)低版本协议中由class DbMessage {...}类实现

⑥ Body中对应command命令名、库名、表名的解析在mongodb(version<3.6)低版本协议中由struct OpMsgRequest{...}结构和struct OpMsg {...}类实现

?? Mongos和mongod实例的服务入口处理流程大同小异,整体处理流程如下:

① 从message解析出opCode操作码,根据不同操作码执行对应操作码回调。

② 根据message解析出OpMsg request信息,mongodb报文的命令信息就存储在该body中,该body已bson格式存储。

③ 从body中解析出command命令字符串信息(如“insert”、“update”等)。

④ 从全局_commands map表中查找是否支持该命令,如果支持则执行该命令处理,如果不支持则直接报错提示。

⑤ 最终找到对应command命令后,执行command的功能run接口。

?? 图形化总结如下:

说明:第3章的协议解析及封装过程实际上应该算是网络处理模块范畴,本文为了分析command命令处理模块方便,把该部分实现归纳到了命令处理模块,这样方便理解。

Tips: 下期继续分享不同command命令执行细节。

8.遗留问题

???? 第1章节中的统计信息,将在command模块核心代码分析完毕后揭晓答案,《mongodb command命令处理模块源码实现二》中继续分析,敬请关注。

  • 发表于:
  • 本文为 InfoQ 中文站特供稿件
  • 首发地址https://www.infoq.cn/article/ed28fa03590e58fd88e987c82
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券
http://www.vxiaotou.com