点击上方“芋道源码”,选择“设为星标”
管她前浪,还是后浪?
能浪的浪,才是好浪!
每天 10:33?更新文章,每天掉亿点点头发...
源码精品专栏
今天来分享一下Nacos注册中心的底层原理,从服务注册到服务发现,非常细致
再讲Nacos之前,先来讲一下服务注册和发现。我们知道,现在微服务架构是目前开发的一个趋势。服务消费者要去调用多个服务提供者组成的集群。这里需要做到以下几点:
因此需要引入服务注册中心,它具有以下几个功能:
而Nacos致力于解决微服务中的统一配置,服务注册和发现等问题。Nacos集成了注册中心和配置中心。其相关特性包括:
1.服务发现和服务健康监测
Nacos支持基于DNS和RPC的服务发现,即服务消费者可以使用DNS或者HTTP的方式来查找和发现服务。Nacos提供对服务的实时的健康检查,阻止向不健康的主机或者服务实例发送请求。Nacos支持传输层(
Ping/TCP
)、应用层(HTTP、Mysql)的健康检查。
2.动态配置服务
动态配置服务可以以中心化、外部化和动态化的方式管理所有环境的应用配置和服务配置。
3.动态DNS服务
支持权重路由,让开发者更容易的实现中间层的负载均衡、更灵活的路由策略、流量控制以及DNS解析服务。
4.服务和元数据管理
Nacos允许开发者从微服务平台建设的视角来管理数据中心的所有服务和元数据。如:服务的生命周期、静态依赖分析、服务的健康状态、服务的流量管理、路由和安全策略等。
基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
以下是Nacos的架构图:
其中分为这么几个模块:
Virtual IP
或者DNS的方式实现Nacos高可用集群的服务路由。OpenAPI
:功能访问入口。Config Service、Naming Service
:Nacos提供的配置服务、名字服务模块。Consistency Protocol
:一致性协议,用来实现Nacos集群节点的数据同步,使用Raft算法实现。其中包含:
小总结:
这里对其原理做一个大致的介绍,在后文则从源码角度进行分析。
首先,服务注册的功能体现在:
Nacos服务注册和发现的实现原理的图如下:
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
前提(在本地或者虚机上先启动好Nacos) 这一部分从2个角度来讲Nacos是如何实现的:
首先看下一个包:spring-cloud-commons
这个ServiceRegistry
接口是SpringCloud提供的服务注册的标准,集成到SpringCloud中实现服务注册的组件,都需要实现这个接口。来看下它的结构:
public?interface?ServiceRegistry<R?extends?Registration>?{??
????void?register(R?registration);??
??
????void?deregister(R?registration);??
??
????void?close();??
??
????void?setStatus(R?registration,?String?status);??
??
????<T>?T?getStatus(R?registration);??
}??
那么对于Nacos而言,该接口的实现类是NacosServiceRegistry
,该类在这个pom包下:
再回过头来看spring-cloud-commons
包:
spring.factories
主要是包含了自动装配的配置信息,如图:
在我之前的文章里我有提到过,在spring.factories
中配置EnableAutoConfiguration
的内容后,项目在启动的时候,会导入相应的自动配置类,那么也就允许对该类的相关属性进行一个自动装配。那么显然,在这里导入了AutoServiceRegistrationAutoConfiguration
这个类,而这个类顾名思义是服务注册相关的配置类。
该类的完整代码如下:
@Configuration(??
????proxyBeanMethods?=?false??
)??
@Import({AutoServiceRegistrationConfiguration.class})??
@ConditionalOnProperty(??
????value?=?{"spring.cloud.service-registry.auto-registration.enabled"},??
????matchIfMissing?=?true??
)??
public?class?AutoServiceRegistrationAutoConfiguration?{??
????@Autowired(??
????????required?=?false??
????)??
????private?AutoServiceRegistration?autoServiceRegistration;??
????@Autowired??
????private?AutoServiceRegistrationProperties?properties;??
??
????public?AutoServiceRegistrationAutoConfiguration()?{??
????}??
??
????@PostConstruct??
????protected?void?init()?{??
????????if?(this.autoServiceRegistration?==?null?&&?this.properties.isFailFast())?{??
????????????throw?new?IllegalStateException("Auto?Service?Registration?has?been?requested,?but?there?is?no?AutoServiceRegistration?bean");??
????????}??
????}??
}??
这里做一个分析,AutoServiceRegistrationAutoConfiguration
中注入了AutoServiceRegistration
实例,该类的关系图如下:
我们先来看一下这个抽象类AbstractAutoServiceRegistration
:
public?abstract?class?AbstractAutoServiceRegistration<R?extends?Registration>?implements?AutoServiceRegistration,???
ApplicationContextAware,???
ApplicationListener<WebServerInitializedEvent>?{??
?public?void?onApplicationEvent(WebServerInitializedEvent?event)?{??
?????this.bind(event);??
?}??
}??
这里实现了ApplicationListener
接口,并且传入了WebServerInitializedEvent
作为泛型,啥意思嘞,意思是:
NacosAutoServiceRegistration
监听WebServerInitializedEvent
事件。onApplicationEvent()
,该方法最终调用NacosServiceRegistry
的register()
方法(NacosServiceRegistry
实现了Spring的一个服务注册标准接口)。对于register()
方法,主要调用的是Nacos Client SDK
中的NamingService
下的registerInstance()
方法完成服务的注册。
public?void?register(Registration?registration)?{??
????if?(StringUtils.isEmpty(registration.getServiceId()))?{??
????????log.warn("No?service?to?register?for?nacos?client...");??
????}?else?{??
????????String?serviceId?=?registration.getServiceId();??
????????String?group?=?this.nacosDiscoveryProperties.getGroup();??
????????Instance?instance?=?this.getNacosInstanceFromRegistration(registration);??
??
????????try?{??
????????????this.namingService.registerInstance(serviceId,?group,?instance);??
????????????log.info("nacos?registry,?{}?{}?{}:{}?register?finished",?new?Object[]{group,?serviceId,?instance.getIp(),?instance.getPort()});??
????????}?catch?(Exception?var6)?{??
????????????log.error("nacos?registry,?{}?register?failed...{},",?new?Object[]{serviceId,?registration.toString(),?var6});??
????????????ReflectionUtils.rethrowRuntimeException(var6);??
????????}??
??
????}??
}??
??
public?void?registerInstance(String?serviceName,?String?groupName,?Instance?instance)?throws?NacosException?{??
????if?(instance.isEphemeral())?{??
????????BeatInfo?beatInfo?=?new?BeatInfo();??
????????beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName,?groupName));??
????????beatInfo.setIp(instance.getIp());??
????????beatInfo.setPort(instance.getPort());??
????????beatInfo.setCluster(instance.getClusterName());??
????????beatInfo.setWeight(instance.getWeight());??
????????beatInfo.setMetadata(instance.getMetadata());??
????????beatInfo.setScheduled(false);??
????????long?instanceInterval?=?instance.getInstanceHeartBeatInterval();??
????????beatInfo.setPeriod(instanceInterval?==?0L???DEFAULT_HEART_BEAT_INTERVAL?:?instanceInterval);??
????????//?1.addBeatInfo()负责创建心跳信息实现健康监测。因为Nacos?Server必须要确保注册的服务实例是健康的。??
????????//?而心跳监测就是服务健康监测的一种手段。??
????????this.beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName,?groupName),?beatInfo);??
????}??
?//?2.registerService()实现服务的注册??
????this.serverProxy.registerService(NamingUtils.getGroupedName(serviceName,?groupName),?groupName,?instance);??
}??
再来看一下心跳监测的方法addBeatInfo()
:
public?void?addBeatInfo(String?serviceName,?BeatInfo?beatInfo)?{??
????LogUtils.NAMING_LOGGER.info("[BEAT]?adding?beat:?{}?to?beat?map.",?beatInfo);??
????String?key?=?this.buildKey(serviceName,?beatInfo.getIp(),?beatInfo.getPort());??
????BeatInfo?existBeat?=?null;??
????if?((existBeat?=?(BeatInfo)this.dom2Beat.remove(key))?!=?null)?{??
????????existBeat.setStopped(true);??
????}??
??
????this.dom2Beat.put(key,?beatInfo);??
????//?通过schedule()方法,定时的向服务端发送一个数据包,然后启动一个线程不断地检测服务端的回应。??
????//?如果在指定的时间内没有收到服务端的回应,那么认为服务器出现了故障。??
????//?参数1:可以说是这个实例的相关信息。??
????//?参数2:一个long类型的时间,代表从现在开始推迟执行的时间,默认是5000??
????//?参数3:时间的单位,默认是毫秒,结合5000即代表每5秒发送一次心跳数据包??
????this.executorService.schedule(new?BeatReactor.BeatTask(beatInfo),?beatInfo.getPeriod(),?TimeUnit.MILLISECONDS);??
????MetricsMonitor.getDom2BeatSizeMonitor().set((double)this.dom2Beat.size());??
}??
心跳检查如果正常,即代表这个需要注册的服务是健康的,那么执行下面的注册方法registerInstance()
:
public?void?registerService(String?serviceName,?String?groupName,?Instance?instance)?throws?NacosException?{??
????LogUtils.NAMING_LOGGER.info("[REGISTER-SERVICE]?{}?registering?service?{}?with?instance:?{}",?new?Object[]{this.namespaceId,?serviceName,?instance});??
????Map<String,?String>?params?=?new?HashMap(9);??
????params.put("namespaceId",?this.namespaceId);??
????params.put("serviceName",?serviceName);??
????params.put("groupName",?groupName);??
????params.put("clusterName",?instance.getClusterName());??
????params.put("ip",?instance.getIp());??
????params.put("port",?String.valueOf(instance.getPort()));??
????params.put("weight",?String.valueOf(instance.getWeight()));??
????params.put("enable",?String.valueOf(instance.isEnabled()));??
????params.put("healthy",?String.valueOf(instance.isHealthy()));??
????params.put("ephemeral",?String.valueOf(instance.isEphemeral()));??
????params.put("metadata",?JSON.toJSONString(instance.getMetadata()));??
????//?这里可以看出来,把上述服务实例的一些必要参数保存到一个Map中,通过OpenAPI的方式发送注册请求??
????this.reqAPI(UtilAndComs.NACOS_URL_INSTANCE,?params,?(String)"POST");??
}??
下面直接Debug走一遍。两个前提(这里不再展开):
1.项目初始化后,根据上文说法,会执行抽象类AbstractAutoServiceRegistration
下面的onApplicationEvent()
方法,即事件被监听到。
2.作为抽象类的子类实现NacosAutoServiceRegistration
,监听到Web服务启动后, 开始执行super.register()
方法。
3.执行NacosServiceRegistry
下的register()
方法(super),前面说过,集成到SpringCloud中实现服务注册的组件,都需要实现ServiceRegistry
这个接口,而对于Nacos而言,NacosServiceRegistry
就是具体的实现子类。执行注册方法需要传入的三个参数:
而registerInstance()
主要做两件事:
this.beatReactor.addBeatInfo()
)。this.serverProxy.registerService()
)。服务健康的检查:检查通过后,发送OpenAPI进行服务的注册:
这里来做一个大框架式的梳理(也许前面写的有点乱,这里通过几个问答的形式来进行总结)
问题1:Nacos的服务注册为什么和
spring-cloud-commons
这个包扯上关系?
回答:
spring-cloud-starter-alibaba-nacos-discovery
吧。spring-cloud-commons
包,那么这个包有什么用?spring-cloud-commons
中有一个接口叫做ServiceRegistry
,而集成到SpringCloud中实现服务注册的组件,都需要实现这个接口。NacosServiceRegistry
。问题2:为什么我的项目加了这几个依赖,服务启动时依旧没有注册到Nacos中?
回答:
spring-boot-starter-web
。问题3:除此之外,
spring-cloud-commons
这个包还有什么作用?
回答:
spring.factories
文件中,配置了相关的服务注册的置类,即支持其自动装配。AutoServiceRegistrationAutoConfiguration
。其注入了类AutoServiceRegistration
,而NacosAutoServiceRegistration
是该类的一个具体实现。说白了:
ServiceRegistry
接口。接下来就对Nacos注册的流程进行一个总结:
spring-cloud-commons
中spring.factories
的配置,自动装配了类AutoServiceRegistrationAutoConfiguration
。AutoServiceRegistrationAutoConfiguration
类中注入了类AutoServiceRegistration
,其最终实现子类实现了Spring的监听器。NacosServiceRegistry
的register()
方法。NamingService
下的registerInstance()
方法完成服务的注册。registerInstance()
方法主要做两件事:服务实例的健康监测和实例的注册。schedule()
方法定时的发送数据包,检测实例的健康。registerService()
方法,通过OpenAPI方式执行服务注册,其中将实例Instance的相关信息存储到HashMap中。有一点我们需要清楚:Nacos服务的发现发生在什么时候。例如:微服务发生远程接口调用的时候。一般我们在使用OpenFeign进行远程接口调用时,都需要用到对应的微服务名称,而这个名称就是用来进行服务发现的。
举个例子:
@FeignClient("test-application")??
public?interface?MyFeignService?{??
????@RequestMapping("getInfoById")??
????R?info(@PathVariable("id")?Long?id);??
}??
接下来直接开始讲重点,Nacos在进行服务发现的时候,会调用NacosServerList
类下的getServers()
方法:
public?class?NacosServerList?extends?AbstractServerList<NacosServer>?{??
?private?List<NacosServer>?getServers()?{??
????????try?{??
????????????String?group?=?this.discoveryProperties.getGroup();??
????????????//?1.通过唯一的serviceId(一般是服务名称)和组来获得对应的所有实例。??
????????????List<Instance>?instances?=?this.discoveryProperties.namingServiceInstance().selectInstances(this.serviceId,?group,?true);??
????????????//?2.将List<Instance>转换成List<NacosServer>数据,然后返回。??
????????????return?this.instancesToServerList(instances);??
????????}?catch?(Exception?var3)?{??
????????????throw?new?IllegalStateException("Can?not?get?service?instances?from?nacos,?serviceId="?+?this.serviceId,?var3);??
????????}??
????}??
}??
接下来来看一下NacosNamingService.selectInstances()
方法:
public?List<Instance>?selectInstances(String?serviceName,?String?groupName,?boolean?healthy)?throws?NacosException?{??
???return?this.selectInstances(serviceName,?groupName,?healthy,?true);??
}??
该方法最终会调用到其重载方法:
public?List<Instance>?selectInstances(String?serviceName,?String?groupName,?List<String>?clusters,???
??boolean?healthy,?boolean?subscribe)?throws?NacosException?{??
?//?保存服务实例信息的对象??
????ServiceInfo?serviceInfo;??
????//?如果该消费者订阅了这个服务,那么会在本地维护一个服务列表,服务从本地获取??
????if?(subscribe)?{??
????????serviceInfo?=?this.hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName,?groupName),?StringUtils.join(clusters,?","));??
????}?else?{??
????//?否则实例会从服务中心进行获取。??
????????serviceInfo?=?this.hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName,?groupName),?StringUtils.join(clusters,?","));??
????}??
??
????return?this.selectInstances(serviceInfo,?healthy);??
}??
这里应该重点关注this.hostReactor
这个对象,它里面比较重要的是几个Map类型的存储结构:
public?class?HostReactor?{??
????private?static?final?long?DEFAULT_DELAY?=?1000L;??
????private?static?final?long?UPDATE_HOLD_INTERVAL?=?5000L;??
????//?存放线程异步调用的一个回调结果??
????private?final?Map<String,?ScheduledFuture<?>>?futureMap;??
????//?本地已存在的服务列表,key是服务名称,value是ServiceInfo??
????private?Map<String,?ServiceInfo>?serviceInfoMap;??
????//?待更新的实例列表??
????private?Map<String,?Object>?updatingMap;??
????//?定时任务(负责服务列表的实时更新)??
????private?ScheduledExecutorService?executor;??
????....??
}??
再看一看它的getServiceInfo()
方法:
public?ServiceInfo?getServiceInfo(String?serviceName,?String?clusters)?{??
????LogUtils.NAMING_LOGGER.debug("failover-mode:?"?+?this.failoverReactor.isFailoverSwitch());??
????String?key?=?ServiceInfo.getKey(serviceName,?clusters);??
????if?(this.failoverReactor.isFailoverSwitch())?{??
????????return?this.failoverReactor.getService(key);??
????}?else?{??
?????//?1.先通过serverName即服务名获得一个serviceInfo??
????????ServiceInfo?serviceObj?=?this.getServiceInfo0(serviceName,?clusters);??
????????//?如果没有serviceInfo,则通过传进来的参数new出一个新的serviceInfo对象,并且同时维护到本地Map和更新Map??
????????//?这里是serviceInfoMap和updatingMap??
????????if?(null?==?serviceObj)?{??
????????????serviceObj?=?new?ServiceInfo(serviceName,?clusters);??
????????????this.serviceInfoMap.put(serviceObj.getKey(),?serviceObj);??
????????????this.updatingMap.put(serviceName,?new?Object());??
????????????//?2.updateServiceNow(),立刻去Nacos服务端拉去数据。??
????????????this.updateServiceNow(serviceName,?clusters);??
????????????this.updatingMap.remove(serviceName);??
????????}?else?if?(this.updatingMap.containsKey(serviceName))?{??
????????????synchronized(serviceObj)?{??
????????????????try?{??
????????????????????serviceObj.wait(5000L);??
????????????????}?catch?(InterruptedException?var8)?{??
????????????????????LogUtils.NAMING_LOGGER.error("[getServiceInfo]?serviceName:"?+?serviceName?+?",?clusters:"?+?clusters,?var8);??
????????????????}??
????????????}??
????????}??
??//?3.定时更新实例信息??
????????this.scheduleUpdateIfAbsent(serviceName,?clusters);??
????????//?最后返回服务实例数据(前面已经进行了更新)??
????????return?(ServiceInfo)this.serviceInfoMap.get(serviceObj.getKey());??
????}??
}??
来看下scheduleUpdateIfAbsent()
方法:
//?通过心跳的方式,每10秒去更新一次数据,并不是只有在调用服务的时候才会进行更新,而是通过定时任务来异步进行。??
public?void?scheduleUpdateIfAbsent(String?serviceName,?String?clusters)?{??
????if?(this.futureMap.get(ServiceInfo.getKey(serviceName,?clusters))?==?null)?{??
????????synchronized(this.futureMap)?{??
????????????if?(this.futureMap.get(ServiceInfo.getKey(serviceName,?clusters))?==?null)?{??
?????????????//?创建一个UpdateTask的更新线程任务,每10秒去异步更新集合数据??
????????????????ScheduledFuture<?>?future?=?this.addTask(new?HostReactor.UpdateTask(serviceName,?clusters));??
????????????????this.futureMap.put(ServiceInfo.getKey(serviceName,?clusters),?future);??
????????????}??
????????}??
????}??
}??
1.进行远程接口调用,触发服务的发现,调用NacosServerList
的getServers()
方法。传入的serviceId和对应Feign接口上的接口@FeignClient
中的名称一致。
例如,我这里调用的Feign接口是:
@FeignClient("gulimall-member")??
public?interface?MemberFeignService?{??
????@RequestMapping("/member/member/info/{id}")??
????R?info(@PathVariable("id")?Long?id);??
}??
这里可以看出来,返回的是一个Instance类型的List,对应的服务也发现并返回了。
2.这里则调用了NacosNamingService
的selectInstances()
方法,我这里的subscribe值是true,即代表我这个消费者直接订阅了这个服务,因此最终的信息是从本地Map中获取,即Nacos维护了一个注册列表。
3.再看下HostReactor的getServiceInfo()
方法:最终所需要的结果是从serviceInfoMap
中获取,并且通过多个Map进行维护服务实例,若存在数据的变化,还会通过强制睡眠5秒钟的方式来等待数据的更新。
4.无论怎样都会调用this.scheduleUpdateIfAbsent(serviceName, clusters)
方法。
5.通过scheduleUpdateIfAbsent()
方法定时的获取实时的实例数据,并且负责维护本地的服务注册列表,若服务发生更新,则更新本地的服务数据。
经常有人说过,Nacos有个好处,就是当一个服务挂了之后,短时间内不会造成影响,因为有个本地注册列表,在服务不更新的情况下,服务还能够正常的运转,其原因如下:
最后,服务发现的流程就是:
NacosServerList
类中的getServers()
方法,根据远程调用接口上@FeignClient
中的属性作为serviceId传入NacosNamingService.selectInstances()
方法中进行调用。HostReactor.getServiceInfo()
来获取服务的信息(serviceInfo),Nacos本地注册列表由3个Map来共同维护。本地Map–>serviceInfoMap, 更新Map–>updatingMap 异步更新结果Map–>futureMap, 最终的结果从serviceInfoMap当中获取。
getServiceInfo()
方法通过this.scheduleUpdateIfAbsent()
方法和updateServiceNow()
方法实现服务的定时更新和立刻更新。scheduleUpdateIfAbsent()
方法,则通过线程池来进行异步的更新,将回调的结果(Future)保存到futureMap中,并且发生提交线程任务时,还负责更新本地注册列表中的数据。欢迎加入我的知识星球,一起探讨架构,交流源码。加入方式,长按下方二维码噢:
已在知识星球更新源码解析如下:
最近更新《芋道 SpringBoot 2.X 入门》系列,已经 101 余篇,覆盖了?MyBatis、Redis、MongoDB、ES、分库分表、读写分离、SpringMVC、Webflux、权限、WebSocket、Dubbo、RabbitMQ、RocketMQ、Kafka、性能测试等等内容。
提供近 3W 行代码的 SpringBoot 示例,以及超 4W 行代码的电商微服务项目。
获取方式:点“在看”,关注公众号并回复?666?领取,更多内容陆续奉上。
文章有帮助的话,在看,转发吧。谢谢支持哟 (*^__^*)