首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

mongodb源码实现系列-网络传输层模块实现二

关于作者

?前滴滴出行技术专家,现任OPPO文档数据库mongodb负责人,负责oppo千万级峰值TPS/十万亿级数据量文档数据库mongodb内核研发及运维工作,一直专注于分布式缓存、高性能服务端、数据库、中间件等相关研发。后续持续分享《MongoDB内核源码设计、性能优化、最佳运维实践》,Github账号地址:https://github.com/y123456yz

1.?说明

??? mongodb源码实现系列文章有前后逻辑关系,阅读本文前,请提前阅读<<mongodb网络模块源码实现及性能调优一>>

在之前的<<mongodb网络模块源码实现及性能调优一>>一文中分析了如何阅读百万级大工程源码、Asio网络库实现、transport传输层网络模块中线程模型实现,但是由于篇幅原因,传输层网络模块中的以下模块实现原理没有分析,本文降将继续分析遗留的以下子模块:

  1. transport_layer套接字处理及传输层管理子模块
  2. session会话子模块
  3. Ticket数据收发子模块
  4. service_entry_point服务入口点子模块
  5. service_state_machine状态机子模块(该《模块在网络传输层模块源码实现三》中分析)
  6. service_executor线程模型子模块(该《模块在网络传输层模块源码实现四》中分析)

2.?transport_layer套接字处理及传输层管理子模块

transport_layer套接字处理及传输层管理子模块功能包括套接字相关初始化处理、结合asio库实现异步accept处理、不同线程模型管理及初始化等,该模块的源码实现主要由以下几个文件实现:

上图是套接字处理及传输层管理子模块源码实现的相关文件,其中mock和test文件主要用于模拟测试等,所以真正核心的代码实现只有下表的几个文件,对应源码文件功能说明如下表所示:

2.1核心代码实现

该子模块核心代码主要由TransportLayerManager类和TransportLayerASIO类相关接口实现。

2.1.1 ?TransportLayerManager类核心代码实现

TransportLayerManager类主要成员及接口如下:

代码语言:javascript
复制

1.//网络会话链接,消息处理管理相关的类,在createWithConfig构造该类存入_tls??
2.class?TransportLayerManager?final?:?public?TransportLayer?{??
3.????//以下四个接口真正实现在TransportLayerASIO类中具体实现??
4.????Ticket?sourceMessage(...)?override;??
5.????Ticket?sinkMessage(...)?override;??????
6.????Status?wait(Ticket&&?ticket)?override;??
7.????void?asyncWait(...)?override;??
8.????//配置初始化实现??
9.????std::unique_ptr<TransportLayer>?createWithConfig(...);??
10.??
11.????//createWithConfig中赋值,对应TransportLayerASIO,??
12.????//实际上容器中就一个成员,就是TransportLayerASIO??
13.????std::vector<std::unique_ptr<TransportLayer>>?_tls;??
14.};??

TransportLayerManager类包含一个_tls成员,该类最核心的createWithConfig接口代码实现如下:

代码语言:javascript
复制

15.//根据配置构造相应类信息??_initAndListen中调用??
16.std::unique_ptr<TransportLayer>?TransportLayerManager::createWithConfig(...)?{??
17.????std::unique_ptr<TransportLayer>?transportLayer;??
18.????//服务类型,也就是本实例是mongos还是mongod??
19.????//mongos对应ServiceEntryPointMongod,mongod对应ServiceEntryPointMongos??
20.????auto?sep?=?ctx->getServiceEntryPoint();??
21.????//net.transportLayer配置模式,默认asio,?legacy模式已淘汰??
22.????if?(config->transportLayer?==?"asio")?{??
23.????     //同步方式还是异步方式,默认synchronous??
24.????????if?(config->serviceExecutor?==?"adaptive")?{??
25.????????    //动态线程池模型,也就是异步模式??
26.????????????opts.transportMode?=?transport::Mode::kAsynchronous;??
27.????????}?else?if?(config->serviceExecutor?==?"synchronous")?{??
28.????????????//一个链接一个线程模型,也就是同步模式??
29.????????????opts.transportMode?=?transport::Mode::kSynchronous;??
30.????????}???
31.????    //如果配置是asio,构造TransportLayerASIO类??
32.????    auto?transportLayerASIO?=?stdx::make_unique<transport::TransportLayerASIO>(opts,?sep);??
33.???    ?if?(config->serviceExecutor?==?"adaptive")?{?//异步方式??
34.????????     //构造动态线程模型对应的执行器ServiceExecutorAdaptive??
35.????????????ctx->setServiceExecutor(stdx::make_unique<ServiceExecutorAdaptive>(??
36.????????????????ctx,?transportLayerASIO->getIOContext()));??
37.??????? ?}?else?if?(config->serviceExecutor?==?"synchronous")?{?//同步方式??
38.????????????//构造一个链接一个线程模型对应的执行器ServiceExecutorSynchronous??
39.????????????ctx->setServiceExecutor(stdx::make_unique<ServiceExecutorSynchronous>(ctx));??
40.???????? }??
41.????     //transportLayerASIO转换为transportLayer类??
42.???????? transportLayer?=?std::move(transportLayerASIO);??
43.????}???
44.???//transportLayer转存到对应retVector数组中并返回??
45.????std::vector<std::unique_ptr<TransportLayer>>?retVector;??
46.????retVector.emplace_back(std::move(transportLayer));??
47.????return?stdx::make_unique<TransportLayerManager>(std::move(retVector));??
48.}??

createWithConfig函数根据配置文件来确定对应的TransportLayer,如果net.transportLayer配置为”asio”,则选用TransportLayerASIO类来进行底层的网络IO处理,如果配置为”legacy”,则选用TransportLayerLegacy。”legacy”模式当前已淘汰,本文只分析”asio”模式实现。

“asio”模式包含两种线程模型:adaptive(动态线程模型)和synchronous(同步线程模型)。adaptive模式线程设计采用动态线程方式,线程数和mongodb压力直接相关,如果mongodb压力大,则线程数增加;如果mongodb压力变小,则线程数自动减少。同步线程模式也就是一个链接一个线程模型,线程数的多少和链接数的多少成正比,链接数越多则线程数也越大。

Mongodb内核实现中通过opts.transportMode来标记asio的线程模型,这两种模型对应标记如下:

???? 说明:adaptive线程模型被标记为KAsynchronous,synchronous被标记为KSynchronous是有原因的,adaptive动态线程模型网络IO处理借助epoll异步实现,而synchronous一个链接一个线程模型网络IO处理是同步读写操作。Mongodb网络线程模型具体实现及各种优缺点可以参考:Mongodb网络传输处理源码实现及性能调优-体验内核性能极致设计

2.1.2 TransportLayerASIO类核心代码实现

TransportLayerASIO类核心成员及接口如下:

代码语言:javascript
复制

1.class?TransportLayerASIO?final?:?public?TransportLayer?{??
2.????//以下四个接口主要和套接字数据读写相关??
3.????Ticket?sourceMessage(...);??
4.????Ticket?sinkMessage(...);??
5.????Status?wait(Ticket&&?ticket);??
6.????void?asyncWait(Ticket&&?ticket,?TicketCallback?callback);??
7.????void?end(const?SessionHandle&?session);??
8.????//新链接处理??
9.????void?_acceptConnection(GenericAcceptor&?acceptor);??
10.??????
11.????//adaptive线程模型网络IO上下文处理??
12.????std::shared_ptr<asio::io_context>?_workerIOContext;???
13.????//accept接收客户端链接对应的IO上下文??
14.????std::unique_ptr<asio::io_context>?_acceptorIOContext;????
15.????//bindIp配置中的ip地址列表,用于bind监听,accept客户端请求??
16.????std::vector<std::pair<SockAddr,?GenericAcceptor>>?_acceptors;??
17.????//listener线程负责接收客户端新链接??
18.????stdx::thread?_listenerThread;??
19.????//服务类型,也就是本实例是mongos还是mongod??
20.????//mongos对应ServiceEntryPointMongod,mongod对应ServiceEntryPointMongos??
21.????ServiceEntryPoint*?const?_sep?=?nullptr;??
22.????//当前运行状态??
23.????AtomicWord<bool>?_running{false};??
24.????//listener处理相关的配置信息??
25.????Options?_listenerOptions;??
26.}??

从上面的类结构可以看出,该类主要通过listenerThread线程完成bind绑定及listen监听操作,同时部分接口实现新连接上的数据读写。

套接字初始化代码实现如下:

代码语言:javascript
复制

1.Status?TransportLayerASIO::setup()?{??
2.????std::vector<std::string>?listenAddrs;??
3.    //如果没有配置bindIp,则默认监听"127.0.0.1:27017"
4.????if?(_listenerOptions.ipList.empty())?{??
5.????????listenAddrs?=?{"127.0.0.1"};??
6.????}?else?{??
7.????????//配置文件中的bindIp:1.1.1.1,2.2.2.2,以逗号分隔符获取ip列表存入ipList??
8.????????boost::split(listenAddrs,?_listenerOptions.ipList,?boost::is_any_of(","),?boost::token_compress_on);??
9.????}??
10.????//遍历ip地址列表??
11.????for?(auto&?ip?:?listenAddrs)?{??
12.????    //根据IP和端口构造对应SockAddr结构??
13.????????const?auto?addrs?=?SockAddr::createAll(??
14.????????????ip,?_listenerOptions.port,?_listenerOptions.enableIPv6???AF_UNSPEC?:?AF_INET);??
15.????????......??
16.????????//根据addr构造endpoint??
17.????????asio::generic::stream_protocol::endpoint?endpoint(addr.raw(),?addr.addressSize);??
18.????    //_acceptorIOContext和_acceptors关联??
19.????????GenericAcceptor?acceptor(*_acceptorIOContext);??
20.????    //epoll注册,也就是fd和epoll关联??
21.????????//basic_socket_acceptor::open??
22.????????acceptor.open(endpoint.protocol());???
23.????     //SO_REUSEADDR配置?basic_socket_acceptor::set_option??
24.????????acceptor.set_option(GenericAcceptor::reuse_address(true));??
25.????    //非阻塞设置?basic_socket_acceptor::non_blocking??
26.????????acceptor.non_blocking(true,?ec);????
27.????????//bind绑定????
28.????????acceptor.bind(endpoint,?ec);???
29.????????if?(ec)?{??
30.????????????return?errorCodeToStatus(ec);??
31.????????}??
32.????}??
}

从上面的分析可以看出,代码实现首先解析出配置文件中bindIP中的ip:port列表,然后遍历列表绑定所有服务端需要监听的ip:port,每个ip:port对应一个GenericAcceptor?,所有acceptor和全局accept IO上下文_acceptorIOContext关联,同时bind()绑定所有ip:port。

??? ??Bind()绑定所有配置文件中的Ip:port后,然后通过TransportLayerASIO::start()完成后续处理,该接口代码实现如下:

代码语言:javascript
复制

1.//_initAndListen中调用执行???
2.Status?TransportLayerASIO::start()?{?//listen线程处理??
3.????......??
4.????//这里专门起一个线程做listen相关的accept事件处理??
5.????_listenerThread?=?stdx::thread([this]?{??
6.????????//修改线程名??
7.????????setThreadName("listener");???
8.????????//该函数中循环处理accept事件??
9.????????while?(_running.load())?{??
10.????????????asio::io_context::work?work(*_acceptorIOContext);???
11.????????????try?{??
12.????????????????//accept事件调度处理??
13.????????         _acceptorIOContext->run();????
14.????????????}?catch?(...)?{?//异常处理??
15.????????????????severe()?<<?"Uncaught?exception?in?the?listener:?"?<<?exceptionToStatus();??
16.????????????????fassertFailed(40491);??
17.????????????}??
18.????????}??
19.????});???
20.???遍历_acceptors,进行listen监听处理??
21.???for?(auto&?acceptor?:?_acceptors)?{???
22.????????acceptor.second.listen(serverGlobalParams.listenBacklog);??
23.????    //异步accept回调注册在该函数中??
24.????????_acceptConnection(acceptor.second);???????
25.????}??
26.}

从上面的TransportLayerASIO::start()接口可以看出,mongodb特地创建了一个listener线程用于客户端accept事件处理,然后借助ASIO网络库的acceptorIOContext->run()接口来调度,当有新链接到来的时候,就会执行相应的accept回调处理,accept回调注册到iocontext的流程由acceptConnection()完成,该接口核心源码实现如下:

代码语言:javascript
复制

1.//accept新连接到来的回调注册?
2.void?TransportLayerASIO::_acceptConnection(GenericAcceptor&?acceptor)?{??
3.????  //新链接到来时候的回调函数,服务端接收到新连接都会执行该回调
4.    //注意这里面是递归执行,保证所有accept事件都会一次处理完毕
5.????auto?acceptCb?=?[this,?&acceptor](const?std::error_code&?ec,?GenericSocket?peerSocket)?mutable?{??
6.????????if?(!_running.load())??
7.????????????return;??
8.??
9.????????......??
10.????    //每个新的链接都会new一个新的ASIOSession??
11.????????std::shared_ptr<ASIOSession>?session(new?ASIOSession(this,?std::move(peerSocket)));??
12.????    //新的链接处理ServiceEntryPointImpl::startSession,??
13.????????//和ServiceEntryPointImpl服务入口点模块关联起来??
14.????????_sep->startSession(std::move(session));??
15.????????//递归,直到处理完所有的网络accept事件??
16.????????_acceptConnection(acceptor);???
17.????};??
18.????//accept新连接到来后服务端的回调处理在这里注册??
19.????acceptor.async_accept(*_workerIOContext,?std::move(acceptCb));??
20.}??

TransportLayerASIO::_acceptConnection的新连接处理过程借助ASIO库实现,通过acceptor.async_accept实现所有监听的acceptor回调异步注册。

当服务端接收到客户端新连接事件通知后,会触发执行acceptCb()回调,该回调中底层ASIO库通过 epoll_wait获取到所有的accept事件,每获取到一个accept事件就代表一个新的客户端链接,然后调用ServiceEntryPointImpl::startSession()接口处理这个新的链接事件,整个过程递归执行,保证一次可以处理所有的客户端accept请求信息。

每个链接都会构造一个唯一的session信息,该session就代表一个唯一的新连接,链接和session一一对应。此外,最终会调用ServiceEntryPointImpl::startSession()进行真正的accept()处理,从而获取到一个新的链接。

注意:TransportLayerASIO::_acceptConnection()中实现了TransportLayerASIO类和ServiceEntryPointImpl类的关联,这两个类在该接口实现了关联。

此外,从前面的TransportLayerASIO类结构中可以看出,该类还包含如下四个接口:sourceMessage(...)、sinkMessage(...)、wait(Ticket&& ticket)、asyncWait(Ticket&& ticket, TicketCallback callback),这四个接口入参都和Ticket数据分发子模块相关联,具体核心代码实现如下:

代码语言:javascript
复制

1.//根据asioSession,?expiration,?message三个信息构造数据接收类ASIOSourceTicket??
2.Ticket?TransportLayerASIO::sourceMessage(...)?{??
3.????......??
4.????auto?asioSession?=?checked_pointer_cast<ASIOSession>(session);??
5.????//根据asioSession,?expiration,?message三个信息构造ASIOSourceTicket??
6.????auto?ticket?=?stdx::make_unique<ASIOSourceTicket>(asioSession,?expiration,?message);??
7.????return?{this,?std::move(ticket)};??
8.}??
9.??
10.//根据asioSession,?expiration,?message三个信息构造数据发送类ASIOSinkTicket??
11.Ticket?TransportLayerASIO::sinkMessage(...)?{??
12.????auto?asioSession?=?checked_pointer_cast<ASIOSession>(session);??
13.????auto?ticket?=?stdx::make_unique<ASIOSinkTicket>(asioSession,?expiration,?message);??
14.????return?{this,?std::move(ticket)};??
15.}??
16.??
17.//同步接收或者发送,最终调用ASIOSourceTicket::fill?或者?ASIOSinkTicket::fill??
18.Status?TransportLayerASIO::wait(Ticket&&?ticket)?{??
19.????//获取对应Ticket,接收对应ASIOSourceTicket,发送对应ASIOSinkTicket??
20.????auto?ownedASIOTicket?=?getOwnedTicketImpl(std::move(ticket));??
21.????auto?asioTicket?=?checked_cast<ASIOTicket*>(ownedASIOTicket.get());??
22.????......??
23.????//调用对应fill接口?同步接收ASIOSourceTicket::fill?或者?同步发送ASIOSinkTicket::fill??
24.????asioTicket->fill(true,?[&waitStatus](Status?result)?{?waitStatus?=?result;?});??
25.????return?waitStatus;??
26.}??
27.//异步接收或者发送,最终调用ASIOSourceTicket::fill?或者?ASIOSinkTicket::fill??
28.void?TransportLayerASIO::asyncWait(Ticket&&?ticket,?TicketCallback?callback)?{??
29.????//获取对应数据收发的Ticket,接收对应ASIOSourceTicket,发送对应ASIOSinkTicket??
30.????auto?ownedASIOTicket?=?std::shared_ptr<TicketImpl>(getOwnedTicketImpl(std::move(ticket)));??
31.????auto?asioTicket?=?checked_cast<ASIOTicket*>(ownedASIOTicket.get());??
32.??
33.???//调用对应ASIOTicket::fill??
34.????asioTicket->fill(??
35.????????false,???[?callback?=?std::move(callback),??
36.????????ownedASIOTicket?=?std::move(ownedASIOTicket)?](Status?status)?{?callback(status);?});??
37.}??

上面四个接口中的前两个接口主要通过Session,?expiration,?message这三个参数来获取对应的Ticket?信息,实际上mongodb内核实现中把接收数据的Ticket和发送数据的Ticket分别用不同的继承类ASIOSourceTicket和ASIOSinkTicket来区分,三个参数的作用如下表所示:

数据收发包括同步收发和异步收发,同步收发通过TransportLayerASIO::wait()实现,异步收发通过TransportLayerASIO::asyncWait()实现。

注意:以上四个接口把TransportLayerASIO类和Ticket?数据收发类的关联。???

2.2总结

??? transport_layer套接字处理及传输层管理子模块主要由transport_layer_manager和transport_layer_asio两个核心类组成,这两个类的核心接口功能总结如下表所示:

Transport_layer_manager中初始化TransportLayer和serviceExecutor,net.TransportLayer配置可以为legacy和asio,其中legacy已经淘汰,当前内核只支持asio模式。asio配置对应的TransportLayer由TransportLayerASIO实现,对应的serviceExecutor线程模型可以是adaptive动态线程模型,也可以是synchronous同步线程模型。

套接字创建、bind()绑定、listen()监听、accept事件注册等都由本类实现,同时数据分发Ticket模块也与本模块关联,一起配合完成整个后续Ticket模块模块的同步及异步数据读写流程。此外,本模块还通过ServiceEntryPoint服务入口子模块联动,保证了套接字初始化、accept事件注册完成后,服务入口子模块能有序的进行新连接接收处理。

接下来继续分析本模块相关联的ServiceEntryPoint服务入口子模块和Ticket数据分发子模块实现。

3.?service_entry_point服务入口点子模块

service_entry_point服务入口点子模块主要负责如下功能:新连接处理、Session会话管理、接收到一个完整报文后的回调处理(含报文解析、认证、引擎层处理等)。

该模块的源码实现主要包含以下几个文件:

???? service_entry_point开头的代码文件都和本模块相关,其中service_entry_point_utils*负责工作线程创建,service_entry_point_impl*完成新链接回调处理及sesseion会话管理。

3.1核心源码实现

???? 服务入口子模块相关代码实现比较简洁,主要由ServiceEntryPointImpl类和service_entry_point_utils中的线程创建函数组成。

3.1.1 ServiceEntryPointImpl类核心代码实现

ServiceEntryPointImpl类主要成员和接口如下:

代码语言:javascript
复制

1.class?ServiceEntryPointImpl?:?public?ServiceEntryPoint?{??
2.????MONGO_DISALLOW_COPYING(ServiceEntryPointImpl);??
3.public:??
4.????//构造函数??
5.????explicit?ServiceEntryPointImpl(ServiceContext*?svcCtx);?????
6.????//以下三个接口进行session会话处理控制??
7.????void?startSession(transport::SessionHandle?session)?final;??
8.????void?endAllSessions(transport::Session::TagMask?tags)?final;??
9.????bool?shutdown(Milliseconds?timeout)?final;??
10.????//session会话统计??
11.????Stats?sessionStats()?const?final;??
12.????......??
13.private:??
14.????//该list结构管理所有的ServiceStateMachine信息??
15.????using?SSMList?=?stdx::list<std::shared_ptr<ServiceStateMachine>>;??
16.????//SSMList对应的迭代器??
17.????using?SSMListIterator?=?SSMList::iterator;??
18.????//赋值ServiceEntryPointImpl::ServiceEntryPointImpl??
19.????//对应ServiceContextMongoD(mongod)或者ServiceContextNoop(mongos)类??
20.????ServiceContext*?const?_svcCtx;???
21.????//该成员变量在代码中没有使用??
22.????AtomicWord<std::size_t>?_nWorkers;??
23.????//锁??
24.????mutable?stdx::mutex?_sessionsMutex;??
25.????//一个新链接对应一个ssm保存到ServiceEntryPointImpl._sessions中??
26.????SSMList?_sessions;??
27.????//最大链接数控制??
28.????size_t?_maxNumConnections{DEFAULT_MAX_CONN};??
29.????//当前的总链接数,不包括关闭的链接??
30.????AtomicWord<size_t>?_currentConnections{0};??
31.????//所有的链接,包括已经关闭的链接??
32.????AtomicWord<size_t>?_createdConnections{0};??
33.};??

该类的几个接口主要是session相关控制处理,该类中的变量成员说明如下:

ServiceEntryPointImpl类最核心的startSession()接口负责每个新连接到来后的内部回调处理,具体实现如下:

代码语言:javascript
复制

1.//新链接到来后的回调处理??
2.void?ServiceEntryPointImpl::startSession(transport::SessionHandle?session)?{???
3.????//获取该新连接对应的服务端和客户端地址信息??
4.????const?auto&?remoteAddr?=?session->remote().sockAddr();??
5.????const?auto&?localAddr?=?session->local().sockAddr();??
6.????//服务端和客户端地址记录到session中??
7.????auto?restrictionEnvironment?=??stdx::make_unique<RestrictionEnvironment>(*remoteAddr,?*localAddr);??
8.????RestrictionEnvironment::set(session,?std::move(restrictionEnvironment));??
9.????......??
10.??
11.????//获取transportMode,kAsynchronous或者kSynchronous??
12.????auto?transportMode?=?_svcCtx->getServiceExecutor()->transportMode();??
13.????//构造ssm??
14.????auto?ssm?=?ServiceStateMachine::create(_svcCtx,?session,?transportMode);??
15.????{//该{}体内实现链接计数,同时把ssm统一添加到_sessions列表管理??
16.????????stdx::lock_guard<decltype(_sessionsMutex)>?lk(_sessionsMutex);??
17.????????connectionCount?=?_sessions.size()?+?1;?//连接数自增??
18.????????if?(connectionCount?<=?_maxNumConnections)?{??
19.????????????//新来的链接对应的session保存到_sessions链表????
20.????????????//一个新链接对应一个ssm保存到ServiceEntryPointImpl._sessions中??
21.????????????ssmIt?=?_sessions.emplace(_sessions.begin(),?ssm);??
22.????????????_currentConnections.store(connectionCount);??
23.????????????_createdConnections.addAndFetch(1);??
24.????????}??
25.????}??
26.????//链接超限,直接退出??
27.????if?(connectionCount?>?_maxNumConnections)?{???
28.????????......??
29.????????return;??
30.????}??
31.????//链接关闭的回收处理??
32.????ssm->setCleanupHook([?this,?ssmIt,?session?=?std::move(session)?]?{??
33.?????????......??
34.????});??
35.????//获取transport模式为同步模式还是异步模式,也就是adaptive线程模式还是synchronous线程模式??
36.????auto?ownership?=?ServiceStateMachine::Ownership::kOwned;??
37.????if?(transportMode?==?transport::Mode::kSynchronous)?{??
38.????????ownership?=?ServiceStateMachine::Ownership::kStatic;??
39.????}??
40.????//ServiceStateMachine::start,这里和服务状态机模块衔接起来??
41.????ssm->start(ownership);??
42.}??

???? 该接口拿到该链接对应的服务端和客户端地址后,记录到该链接对应session中,然后根据该session、transportMode、_svcCtx构建一个服务状态机ssm(ServiceStateMachine)。一个新链接对应一个唯一session,一个session对应一个唯一的服务状态机ssm,这三者保持唯一的一对一关系。

????? 最终,startSession()让服务入口子模块、session会话子模块、ssm状态机子模块关联起来。 ??

3.1.2 ?service_entry_point_utils核心代码实现

service_entry_point_utils源码文件只有launchServiceWorkerThread一个接口,该接口主要负责工作线程创建,并设置每个工作线程的线程栈大小,如果系统默认栈大于1M,则每个工作线程的线程栈大小设置为1M,如果系统栈大小小于1M,则以系统堆栈大小为准,同时warning打印提示。该函数实现如下:

代码语言:javascript
复制

1.Status?launchServiceWorkerThread(stdx::function<void()>?task)?{??
2.????????static?const?size_t?kStackSize?=?1024?*?1024;??//1M??
3.????????struct?rlimit?limits;??
4.????????//或者系统堆栈大小??
5.????????invariant(getrlimit(RLIMIT_STACK,?&limits)?==?0);??
6.????????//如果系统堆栈大小大于1M,则默认设置线程栈大小为1M??
7.????????if?(limits.rlim_cur?>?kStackSize)?{??
8.????????????size_t?stackSizeToSet?=?kStackSize;??
9.????????????int?failed?=?pthread_attr_setstacksize(&attrs,?stackSizeToSet);??
10.????????????if?(failed)?{??
11.????????????????const?auto?ewd?=?errnoWithDescription(failed);??
12.????????????????warning()?<<?"pthread_attr_setstacksize?failed:?"?<<?ewd;??
13.????????????}??
14.????????}?else?if?(limits.rlim_cur?<?1024?*?1024)?{??
15.????????????//如果系统栈大小小于1M,则已系统堆栈为准,同时给出告警??
16.????????????warning()?<<?"Stack?size?set?to?"?<<?(limits.rlim_cur?/?1024)?<<?"KB.?We?suggest?1MB";??
17.????????}}??
18.????????......??
19.????????//task参数传递给新建线程??
20.????????auto?ctx?=?stdx::make_unique<stdx::function<void()>>(std::move(task));??
21.????????int?failed?=?pthread_create(&thread,?&attrs,?runFunc,?ctx.get());???
22.????????......??
23.}?

3.2总结

service_entry_point服务入口点子模块主要负责新连接后的回调处理及工作线程创建,该模块和后续的session会话模块、SSM服务状态机模块衔接配合,完成数据收发的正常逻辑转换处理。上面的分析只列出了服务入口点子模块的核心接口实现,下表总结该模块所有的接口功能:

3.?Ticket数据收发子模块

???? Ticket数据收发子模块主要功能如下:调用session子模块进行底层asio库处理、拆分数据接收和数据发送到两个类、完整mongodb报文读取 、接收或者发送mongodb报文后的回调处理。

3.1 ASIOTicket类核心代码实现

Ticket数据收发模块相关实现主要由ASIOTicket类完成,该类结构如下:

代码语言:javascript
复制

1.//下面的ASIOSinkTicket和ASIOSourceTicket继承该类,用于控制数据的发送和接收??
2.class?TransportLayerASIO::ASIOTicket?:?public?TicketImpl?{??
3.public:??
4.????//初始化构造??
5.????explicit?ASIOTicket(const?ASIOSessionHandle&?session,?Date_t?expiration);??
6.????//获取sessionId??
7.????SessionId?sessionId()?const?final?{??
8.????????return?_sessionId;??
9.????}??
10.????//asio模式没用,针对legacy模型??
11.????Date_t?expiration()?const?final?{??
12.????????return?_expiration;??
13.????}??
14.
15.????//以下四个接口用于数据收发相关处理??
16.????void?fill(bool?sync,?TicketCallback&&?cb);??
17.protected:??
18.????void?finishFill(Status?status);??
19.????bool?isSync()?const;??
20.????virtual?void?fillImpl()?=?0;??
21.private:??
22.????//会话信息,一个链接一个session??
23.????std::weak_ptr<ASIOSession>?_session;??
24.????//每个session有一个唯一id??
25.????const?SessionId?_sessionId;??
26.????//asio模型没用,针对legacy生效??
27.????const?Date_t?_expiration;??
28.????//数据发送或者接收成功后的回调处理??
29.????TicketCallback?_fillCallback;??
30.????//同步方式还是异步方式进行数据处理,默认异步??
31.????bool?_fillSync;??
32.};??

该类保护多个成员变量,这些成员变量功能说明如下:

mongodb在具体实现上,数据接收和数据发送分开实现,分别是数据接收类ASIOSourceTicket和数据发送类ASIOSinkTicket,这两个类都继承自ASIOTicket类,这两个类的主要结构如下:

代码语言:javascript
复制

1.//数据接收的ticket??
2.class?TransportLayerASIO::ASIOSourceTicket?:?public?TransportLayerASIO::ASIOTicket?{??
3.public:??
4.????//初始化构造??
5.????ASIOSourceTicket(const?ASIOSessionHandle&?session,?Date_t?expiration,?Message*?msg);??
6.protected:??
7.????//数据接收Impl??
8.????void?fillImpl()?final;??
9.private:??
10.????//接收到mongodb头部数据后的回调处理??
11.????void?_headerCallback(const?std::error_code&?ec,?size_t?size);??
12.????//接收到mongodb包体数据后的回调处理????
13.????void?_bodyCallback(const?std::error_code&?ec,?size_t?size);??
14.??
15.????//存储数据的buffer,网络IO读取到的原始数据内容??
16.????SharedBuffer?_buffer;??
17.????//数据Message管理,数据来源为_buffer??
18.????Message*?_target;??
19.};??
1.
2.??
20.//数据发送的ticket??
21.class?TransportLayerASIO::ASIOSinkTicket?:?public?TransportLayerASIO::ASIOTicket?{??
22. public:??
23.????//初始化构造??
24.????ASIOSinkTicket(const?ASIOSessionHandle&?session,?Date_t?expiration,?const?Message&?msg);??
25.protected:??
26.    //数据发送Impl??
27.????void?fillImpl()?final;??
28.private:??
29.????//发送数据完成的回调处理??
30.????void?_sinkCallback(const?std::error_code&?ec,?size_t?size);??
31.????//需要发送的数据message信息??
32.????Message?_msgToSend;??
33.};?

??? 从上面的代码实现可以看出,ASIOSinkTicket?和ASIOSourceTicket?类接口及成员实现几乎意义,只是具体的实现方法不同,下面对ASIOSourceTicket和ASIOSinkTicket?相关核心代码实现进行分析。

3.1.2 ASIOSourceTicket?数据接收核心代码实现

数据接收过程核心代码如下:

代码语言:javascript
复制

1.//数据接收的fillImpl接口实现??
2.void?TransportLayerASIO::ASIOSourceTicket::fillImpl()?{????
3.????//获取对应session信息??
4.????auto?session?=?getSession();??
5.????if?(!session)??
6.????????return;??
7.????//收到读取mongodb头部数据,头部数据长度是固定的kHeaderSize字节??
8.????const?auto?initBufSize?=?kHeaderSize;??
9.????_buffer?=?SharedBuffer::allocate(initBufSize);??
10.??
11.????//调用TransportLayerASIO::ASIOSession::read读取底层数据存入_buffer??
12.????//读完头部数据后执行对应的_headerCallback回调函数??
13.????session->read(isSync(),??
14.??????????????????asio::buffer(_buffer.get(),?initBufSize),?//先读取头部字段出来??
15.??????????????????[this](const?std::error_code&?ec,?size_t?size)?{?_headerCallback(ec,?size);?});??
16.}??
17.??
18.//读取到mongodb?header头部信息后的回调处理??
19.void?TransportLayerASIO::ASIOSourceTicket::_headerCallback(const?std::error_code&?ec,?size_t?size)?{??
20.????......??
21.????//获取session信息??
22.????auto?session?=?getSession();??
23.????if?(!session)??
24.????????return;??
25.????//从_buffer中获取头部信息??
26.????MSGHEADER::View?headerView(_buffer.get());??
27.????//获取message长度??
28.????auto?msgLen?=?static_cast<size_t>(headerView.getMessageLength());??
29.????//长度太小或者太大,直接报错??
30.????if?(msgLen?<?kHeaderSize?||?msgLen?>?MaxMessageSizeBytes)?{??
31.????????.......??
32.????????return;??
33.????}??
34.????....??
35.???//内容还不够一个mongo协议报文,继续读取body长度字节的数据,读取完毕后开始body处理??
36.???//注意这里是realloc,保证头部和body在同一个buffer中??
37.????_buffer.realloc(msgLen);???
38.????MsgData::View?msgView(_buffer.get());??
39.??
40.????//调用底层TransportLayerASIO::ASIOSession::read读取数据body???
41.????session->read(isSync(),??
42.??????//数据读取到该buffer??????????????????
43.??????asio::buffer(msgView.data(),?msgView.dataLen()),??
44.??????//读取成功后的回调处理??
45.??????[this](const?std::error_code&?ec,?size_t?size)?{?_bodyCallback(ec,?size);?});??
46.}??
47.??
48.//_headerCallback对header读取后解析header头部获取到对应的msg长度,然后开始body部分处理??
49.void?TransportLayerASIO::ASIOSourceTicket::_bodyCallback(const?std::error_code&?ec,?size_t?size)?{??
50.????......??
51.????//buffer转存到_target中??
52.????_target->setData(std::move(_buffer));??
53.????//流量统计??
54.????networkCounter.hitPhysicalIn(_target->size());??
55.????//TransportLayerASIO::ASIOTicket::finishFill????
56.????finishFill(Status::OK());?//包体内容读完后,开始下一阶段的处理????
57.????//报文读取完后的下一阶段就是报文内容处理,开始走ServiceStateMachine::_processMessage??
58.}??

Mongodb协议由msg header + msg body组成,一个完整的mongodb报文内容格式如下:

上图所示各个字段及body内容部分功能说明如下表:

??? ASIOSourceTicket类的几个核心接口都是围绕这一原则展开,整个mongodb数据接收流程如下:

1.?读取mongodb头部header数据,解析出header中的messageLength字段。

2.?检查messageLength字段是否在指定的合理范围,该字段不能小于Header整个头部大小,也不能超过MaxMessageSizeBytes最大长度。

3.?Header len检查通过,说明读取header数据完成,于是执行_headerCallback回调。

4.?realloc更多的空间来存储body内容。

5.?继续读取body len长度数据,读取body完成后,执行_bodyCallback回调处理。

3.1.3 ASIOSinkTicket数据发送类核心代码实现

???? ASIOSinkTicket发送类相比接收类,没有数据解析相关的流程,因此实现过程会更加简单,具体源码实现如下:

代码语言:javascript
复制

1.//发送数据成功后的回调处理??
2.void?TransportLayerASIO::ASIOSinkTicket::_sinkCallback(const?std::error_code&?ec,?size_t?size)?{??
3.????//发送的网络字节数统计??
4.????networkCounter.hitPhysicalOut(_msgToSend.size());???
5.????//执行SSM中对应的状态流程??
6.????finishFill(ec???errorCodeToStatus(ec)?:?Status::OK());??
7.}??
8.??
9.//发送数据的fillImpl??
10.void?TransportLayerASIO::ASIOSinkTicket::fillImpl()?{??
11.????//获取对应session??
12.????auto?session?=?getSession();??
13.????if?(!session)??
14.????????return;??
15.??
16.????//调用底层TransportLayerASIO::ASIOSession::write发送数据,发送成功后执行_sinkCallback回调???
17.????session->write(isSync(),??
18.???????asio::buffer(_msgToSend.buf(),?_msgToSend.size()),??
19.???????//发送数据成功后的callback回调??
20.???????[this](const?std::error_code&?ec,?size_t?size)?{?_sinkCallback(ec,?size);?});??
21.}??

3.2总结

从上面的分析可以看出,Ticket数据收发模块主要调用session会话模块来进行底层数据的读写、解析,当读取或者发送一个完整的mongodb报文后最终交由SSM服务状态机模块调度处理。

ticket模块主要接口功能总结如下表所示:

前面的分析也可以看出,Ticket数据收发模块会调用session处理模块来进行真正的数据读写,同时接收或者发送完一个完整mongodb报文后的后续回调处理讲交由SSM服务状态机模块处理。

4.?Session会话子模块

Session会话模块功能主要如下:负责记录HostAndPort、和底层asio库直接互动,实现数据的同步或者异步收发。一个新连接fd对应一个唯一的session,对fd的操作直接映射为session操作。Session会话子模块主要代码实现相关文件如下:

4.1 session会话子模块核心代码实现

代码语言:javascript
复制

1.class?TransportLayerASIO::ASIOSession?:?public?Session?{??
2.????//初始化构造??
3.????ASIOSession(TransportLayerASIO*?tl,?GenericSocket?socket);??
4.????//获取本session使用的tl??
5.????TransportLayer*?getTransportLayer();??
6.????//以下四个接口套接字相关,本端/对端地址获取,获取fd,关闭fd等??
7.????const?HostAndPort&?remote();??
8.????const?HostAndPort&?local();??
9.????GenericSocket&?getSocket();??
10.????void?shutdown();??
11.??
12.????//以下四个接口调用asio网络库实现数据的同步收发和异步收发??
13.????void?read(...)??
14.????void?write(...)??
15.????void?opportunisticRead(...)??
16.????void?opportunisticWrite(...)??
17.??
18.????//远端地址信息??
19.????HostAndPort?_remote;??
20.????//本段地址信息??
21.????HostAndPort?_local;??
22.????//赋值见TransportLayerASIO::_acceptConnection??
23.????//也就是fd,一个新连接对应一个_socket??
24.????GenericSocket?_socket;??
25.????//SSL相关不做分析,??
26.#ifdef?MONGO_CONFIG_SSL??
27.????boost::optional<asio::ssl::stream<decltype(_socket)>>?_sslSocket;??
28.????bool?_ranHandshake?=?false;??
29.#endif??
30.??
31.????//本套接字对应的tl,赋值建TransportLayerASIO::_acceptConnection(...)??
32.????TransportLayerASIO*?const?_tl;??
33.}?

该类最核心的三个接口ASIOSession(...)、opportunisticRead(...)、opportunisticWrite(..)分别完成套接字处理、调用asio库接口实现底层数据读和底层数据写。这三个核心接口源码实现如下:

代码语言:javascript
复制

1.//初始化构造?TransportLayerASIO::_acceptConnection调用??
2.ASIOSession(TransportLayerASIO*?tl,?GenericSocket?socket)??
3.????//fd描述符及TL初始化赋值??
4.????:?_socket(std::move(socket)),?_tl(tl)?{??
5.????std::error_code?ec;??
6.??
7.????//异步方式设置为非阻塞读??
8.????_socket.non_blocking(_tl->_listenerOptions.transportMode?==?Mode::kAsynchronous,?ec);??
9.????fassert(40490,?ec.value()?==?0);??
10.??
11.????//获取套接字的family??
12.????auto?family?=?endpointToSockAddr(_socket.local_endpoint()).getType();??
13.????//满足AF_INET
14.????if?(family?==?AF_INET?||?family?==?AF_INET6)?{??
15.????????//no_delay?keep_alive套接字系统参数设置??
16.????????_socket.set_option(asio::ip::tcp::no_delay(true));??
17.????????_socket.set_option(asio::socket_base::keep_alive(true));??
18.????????//KeepAlive系统参数设置??
19.????????setSocketKeepAliveParams(_socket.native_handle());??
20.????}??
21.??
22.????//获取本端和对端地址??
23.????_local?=?endpointToHostAndPort(_socket.local_endpoint());??
24.????_remote?=?endpointToHostAndPort(_socket.remote_endpoint(ec));??
25.????if?(ec)?{??
26.????????LOG(3)?<<?"Unable?to?get?remote?endpoint?address:?"?<<?ec.message();??
27.????}??
28.}??

该类初始化的时候完成新连接_socket相关的初始化设置,包括阻塞读写还是非阻塞读写。如果是同步线程模型(一个链接一个线程),则读写方式位阻塞读写;如果是异步线程模型(adaptive动态线程模型),则调用asio网络库接口实现异步读写。

此外,该链接socket对应的客户端ip:port和服务端ip:port也在该初始化类中获取,最终保存到本session的remote和_local成员中。

数据读取核心代码实现如下:

代码语言:javascript
复制

1.//读取指定长度数据,然后执行handler回调??
2.void?opportunisticRead(...)?{??
3.????std::error_code?ec;??
4.????//如果是异步线程模型,在ASIOSession构造初始化的时候会设置non_blocking非阻塞模式??
5.????//异步线程模型这里实际上是非阻塞读取,如果是同步线程模型,则没有non_blocking设置,也就是阻塞读取??
6.????auto?size?=?asio::read(stream,?buffers,?ec);????
7.??
8.????//如果是异步读,并且read返回would_block或者try_again说明指定长度的数据还没有读取完毕??
9.????if?((ec?==?asio::error::would_block?||?ec?==?asio::error::try_again)?&&?!sync)?{??
10.????????//buffers有大小size,实际读最多读size字节??
11.????????MutableBufferSequence?asyncBuffers(buffers);??
12.????????if?(size?>?0)?{??
13.????????????asyncBuffers?+=?size;?//buffer?offset向后移动??
14.????????}??
15.??
16.????????//继续异步方式读取数据,读取到指定长度数据后执行handler回调处理??
17.????????asio::async_read(stream,?asyncBuffers,?std::forward<CompleteHandler>(handler));??
18.????}?else?{???
19.????????//阻塞方式读取read返回后可以保证读取到了size字节长度的数据??
20.????????//直接read获取到size字节数据,则直接执行handler???
21.????????handler(ec,?size);??
22.????}??
23.}??

opportunisticRead首先调用asio::read(stream, buffers, ec)读取buffers对应size长度的数据,buffers空间大小就是需要读取的数据size大小。如果是同步线程模型,则这里为阻塞式读取,直到读到size字节才会返回;如果是异步线程模型,这这里是非阻塞读取,非阻塞读取当内核网络协议栈数据读取完毕后,如果还没有读到size字节,则继续进行async_read异步读取。

当buffers指定的size字节全部读取完整后,不管是同步模式还是异步模式,最终都会执行handler回调,开始后续的数据解析及处理流程。

发送数据核心代码实现如下:

代码语言:javascript
复制

1.//发送数据??
2.void?opportunisticWrite(...)?{??
3.????std::error_code?ec;??
4.????//如果是同步模式,则阻塞写,直到全部写成功。异步模式则非阻塞写??
5.????auto?size?=?asio::write(stream,?buffers,?ec);???
6.??
7.????//异步写如果返回try_again说明数据还没有发送完,则继续异步写发送??
8.????if?((ec?==?asio::error::would_block?||?ec?==?asio::error::try_again)?&&?!sync)?{??
9.????????ConstBufferSequence?asyncBuffers(buffers);??
10.????????if?(size?>?0)?{??//buffer中数据指针偏移计数
11.????????????asyncBuffers?+=?size;??
12.????????}??
13.????????//异步写发送完成,执行handler回调??
14.????????asio::async_write(stream,?asyncBuffers,?std::forward<CompleteHandler>(handler));??
15.????}?else?{??
16.????????//同步写成功,则直接执行handler回调处理??
17.????????handler(ec,?size);??
18.????}??
19.}??

数据发送流程和数据接收流程类似,也分位同步模式发送和异步模式发送,同步模式发送为阻塞式写,只有当所有数据通过asio::write()发送成功后才返回;异步模式发送为非阻塞写,asio::write()不一定全部发送出去,因此需要再次调用asio库的asio::async_write()进行异步发送。

不管是同步模式还是异步模式发送数据,最终数据发送成功后,都会调用handler()回调来执行后续的流程。

4.2总结

从上面的代码分析可以看出,session会话模块最终直接和asio网络库交互实现数据的读写操作。该模块核心接口功能总结如下表:

5.?总结

《Mongodb网络传输处理源码实现及性能调优-体验内核性能极致设计》一文对mongodb网络传输模块中的ASIO网络库实现、service_executor服务运行子模块(即线程模型子模块)进行了详细的分析,加上本文的transport_layer套接字处理及传输层管理子模块、session会话子模块、Ticket数据收发子模块、service_entry_point服务入口点子模块。

???? transport_layer套接字处理及传输层管理子模块主要由transport_layer_manager和transport_layer_asio两个核心类组成。分别完成net相关的配置文件初始化操作,套接字初始化及处理,最终transport_layer_asio的相应接口实现了和ticket数据分发子模块、服务入口点子模块的关联。

???? 服务入口子模块主要由ServiceEntryPointImpl类和service_entry_point_utils中的线程创建函数组成,实现新连接accept处理及控制。该模块通过startSession()让服务入口子模块、session会话子模块、ssm状态机子模块关联起来。

?? ??ticket数据收发子模块主要功能如下:调用session子模块进行底层asio库处理、拆分数据接收和数据发送到两个类、完整mongodb报文读取 、接收或者发送mongodb报文后的回调处理,回调处理由SSM服务状态机模块管理,当读取或者发送一个完整的mongodb报文后最终交由SSM服务状态机模块调度处理。。

???? Session会话模块功能主要如下:负责记录HostAndPort、和底层asio库直接互动,实现数据的同步或者异步收发。一个新连接fd对应一个唯一的session,对fd的操作直接映射为session操作。

到这里,整个mongodb网络传输层模块分析只差service_state_machine状态机调度子模块,状态机调度子模块相比本文分析的几个子模块更加复杂,因此将在下期《mongodb网络传输层模块源码分析三》中单独分析。

本文所有源码注释分析详见如下链接:mongodb网络传输模块详细源码分析

  • 发表于:
  • 本文为 InfoQ 中文站特供稿件
  • 首发地址https://www.infoq.cn/article/1cac5adcd1b4f3fe512a1457a
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券
http://www.vxiaotou.com