文章目录

  • 一、前言
  • 二、服务端-服务注册
  • 三、客户端-服务发现
    • (1)注册listener
    • (2)subscribe
  • 四、服务端-服务发现
  • 五、小结

一、前言

上篇博客介绍了客户端服务注册的流程,本篇介绍服务端的服务注册,服务发现等核心流程。

二、服务端-服务注册

入口在InstanceController的register(),核心逻辑在ServiceManager类中。

ServiceManager:核心服务管理类,管理服务、实例信息。包含nacos的服务注册表。

//Register an instance to a service in AP mode.
//先创建service,校验service,之后创建instance
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {createEmptyService(namespaceId, serviceName, instance.isEphemeral());Service service = getService(namespaceId, serviceName);checkServiceIsNull(service, namespaceId, serviceName);addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {createServiceIfAbsent(namespaceId, serviceName, local, null);
}public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)throws NacosException {Service service = getService(namespaceId, serviceName);if (service == null) {Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);service = new Service();service.setName(serviceName);service.setNamespaceId(namespaceId);service.setGroupName(NamingUtils.getGroupName(serviceName));// now validate the service. if failed, exception will be thrownservice.setLastModifiedMillis(System.currentTimeMillis());service.recalculateChecksum();if (cluster != null) {cluster.setService(service);service.getClusterMap().put(cluster.getName(), cluster);}service.validate();putServiceAndInit(service);if (!local) {addOrReplaceService(service);}}
}//添加instance,获取service后,使用synchronized锁住service,防止并发操作。
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)throws NacosException {String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);Service service = getService(namespaceId, serviceName);synchronized (service) {List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);Instances instances = new Instances();instances.setInstanceList(instanceList);consistencyService.put(key, instances);}
}

ServiceManager的init()还做了一些初始化工作:


@PostConstruct
public void init() {//service reporter的定时任务执行器GlobalExecutor.scheduleServiceReporter(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);//服务更新管理的定时任务执行器GlobalExecutor.submitServiceUpdateManager(new UpdatedServiceProcessor());if (emptyServiceAutoClean) {Loggers.SRV_LOG.info("open empty service auto clean job, initialDelay : {} ms, period : {} ms",cleanEmptyServiceDelay, cleanEmptyServicePeriod);// delay 60s, period 20s;// This task is not recommended to be performed frequently in order to avoid// the possibility that the service cache information may just be deleted// and then created due to the heartbeat mechanism//自动清理空的服务的执行器GlobalExecutor.scheduleServiceAutoClean(new EmptyServiceAutoCleaner(this, distroMapper), cleanEmptyServiceDelay,cleanEmptyServicePeriod);}try {Loggers.SRV_LOG.info("listen for service meta change");//监听meta keyconsistencyService.listen(KeyBuilder.SERVICE_META_KEY_PREFIX, this);} catch (NacosException e) {Loggers.SRV_LOG.error("listen for service meta change failed!");}
}

注册表:

双重map结构


/**
* Map(namespace, Map(group::serviceName, Service)).
*/
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();

三、客户端-服务发现

客户端的服务发现,可以先从源码中的例子入手。例子位于nacos-example中的NamingExample。


NamingService naming = NamingFactory.createNamingService(properties);
naming.subscribe("nacos.test.3", new AbstractEventListener() {//EventListener onEvent is sync to handle, If process too low in onEvent, maybe block other onEvent callback.//So you can override getExecutor() to async handle event.@Overridepublic Executor getExecutor() {return executor;}@Overridepublic void onEvent(Event event) {System.out.println("serviceName: " + ((NamingEvent) event).getServiceName());System.out.println("instances from event: " + ((NamingEvent) event).getInstances());}
});

进入NacosNamingService,注册listener,订阅。


public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)throws NacosException {if (null == listener) {return;}String clusterString = StringUtils.join(clusters, ",");changeNotifier.registerListener(groupName, serviceName, clusterString, listener);clientProxy.subscribe(serviceName, groupName, clusterString);
}

(1)注册listener

InstancesChangeNotifier


public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> {//缓存map,用来存储listenerprivate final Map<String, ConcurrentHashSet<EventListener>> listenerMap = new ConcurrentHashMap<String, ConcurrentHashSet<EventListener>>();//Object锁private final Object lock = new Object();//注册listener,根据key在map中查询对应的listener set。使用synchronized锁定防止并发操作。
//若查到,在set中添加本次要添加的listener。
//若没有对应的set,新建空set就添加到map中,在set中加入listener。
public void registerListener(String groupName, String serviceName, String clusters, EventListener listener) {String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);if (eventListeners == null) {synchronized (lock) {eventListeners = listenerMap.get(key);if (eventListeners == null) {eventListeners = new ConcurrentHashSet<EventListener>();listenerMap.put(key, eventListeners);}}}eventListeners.add(listener);
}
}

(2)subscribe

NamingClientProxy

对于NamingClientProxy接口,实现类主要有这三个。上篇提到过,这里不再赘述,直接说具体业务逻辑。

NamingClientProxyDelegate


public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {NAMING_LOGGER.info("[SUBSCRIBE-SERVICE] service:{}, group:{}, clusters:{} ", serviceName, groupName, clusters);String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
//如果存在update service task,则执行serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
//校验:若sserviceInfoHolder中没有该ervice或未被订阅,就开始订阅ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);if (null == result || !isSubscribed(serviceName, groupName, clusters)) {result = grpcClientProxy.subscribe(serviceName, groupName, clusters);}
//更新缓存中的serviceserviceInfoHolder.processServiceInfo(result);return result;
}public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {String serviceKey = serviceInfo.getKey();if (serviceKey == null) {return null;}ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());//若service为空或者error,则忽略if (isEmptyOrErrorPush(serviceInfo)) {return oldService;}serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
//校验新旧service,是否发生变化。boolean changed = isChangedServiceInfo(oldService, serviceInfo);if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));}MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
//若服务发生变化,发布事件通知。if (changed) {NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(),JacksonUtils.toJson(serviceInfo.getHosts()));NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),serviceInfo.getClusters(), serviceInfo.getHosts()));DiskCache.write(serviceInfo, cacheDir);}return serviceInfo;
}

NamingGrpcClientProxy


public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {if (NAMING_LOGGER.isDebugEnabled()) {NAMING_LOGGER.debug("[GRPC-SUBSCRIBE] service:{}, group:{}, cluster:{} ", serviceName, groupName, clusters);}
//放入缓存redoService.cacheSubscriberForRedo(serviceName, groupName, clusters);return doSubscribe(serviceName, groupName, clusters);
}private final ConcurrentMap<String, SubscriberRedoData> subscribes = new ConcurrentHashMap<>();public void cacheSubscriberForRedo(String serviceName, String groupName, String cluster) {String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), cluster);SubscriberRedoData redoData = SubscriberRedoData.build(serviceName, groupName, cluster);synchronized (subscribes) {subscribes.put(key, redoData);}
}//订阅,构造request发起grpc  其中requestToServer() 上篇提到过,这里不做赘述。
public ServiceInfo doSubscribe(String serviceName, String groupName, String clusters) throws NacosException {SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, groupName, serviceName, clusters,true);SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class);redoService.subscriberRegistered(serviceName, groupName, clusters);return response.getServiceInfo();
}public void subscriberRegistered(String serviceName, String groupName, String cluster) {String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), cluster);synchronized (subscribes) {SubscriberRedoData redoData = subscribes.get(key);if (null != redoData) {redoData.setRegistered(true);}}
}

NamingHttpClientProxy

public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {return queryInstancesOfService(serviceName, groupName, clusters, pushReceiver.getUdpPort(), false);
}public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort,boolean healthyOnly) throws NacosException {final Map<String, String> params = new HashMap<String, String>(16);params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, NamingUtils.getGroupedName(serviceName, groupName));params.put(CLUSTERS_PARAM, clusters);params.put(UDP_PORT_PARAM, String.valueOf(udpPort));params.put(CLIENT_IP_PARAM, NetUtils.localIP());params.put(HEALTHY_ONLY_PARAM, String.valueOf(healthyOnly));String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);if (StringUtils.isNotEmpty(result)) {return JacksonUtils.toObj(result, ServiceInfo.class);}return new ServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), clusters);
}

四、服务端-服务发现

客户端服务发现调用的接口在nacos-naming的InstanceController和InstanceControllerV2中。V2中也兼容了V1的逻辑。

    @GetMapping("/list")@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)public Object list(HttpServletRequest request) throws Exception {String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);NamingUtils.checkServiceNameFormat(serviceName);String agent = WebUtils.getUserAgent(request);String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));String app = WebUtils.optional(request, "app", StringUtils.EMPTY);String env = WebUtils.optional(request, "env", StringUtils.EMPTY);String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);Subscriber subscriber = new Subscriber(clientIP + ":" + udpPort, agent, app, clientIP, namespaceId, serviceName,udpPort, clusters);return getInstanceOperator().listInstance(namespaceId, serviceName, subscriber, clusters, healthyOnly);}public ServiceInfo listInstance(String namespaceId, String serviceName, Subscriber subscriber, String cluster,  boolean healthOnly) throws Exception {...String clientIP = subscriber.getIp();ServiceInfo result = new ServiceInfo(serviceName, cluster);//具体见下面...// now try to enable the pushtry {if (subscriber.getPort() > 0 && pushService.canEnablePush(subscriber.getAgent())) {subscriberServiceV1.addClient(namespaceId, serviceName, cluster, subscriber.getAgent(),new InetSocketAddress(clientIP, subscriber.getPort()), pushDataSource, StringUtils.EMPTY,StringUtils.EMPTY);cacheMillis = switchDomain.getPushCacheMillis(serviceName);}} catch (Exception e) {Loggers.SRV_LOG.error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP,subscriber.getPort(), e);cacheMillis = switchDomain.getDefaultCacheMillis();}...return result;}

Nacos Client并不是完全依赖定时任务来感知Service的变化,为了尽量的去弥补这个延迟问题,采用一个UDP的变更通知设计,客户端调用/nacos/v1/ns/instance/list接口的时候会传入一个UDP的port,在接口中会把Service订阅的其他Service加入到一个com.alibaba.nacos.naming.push.PushService#clientMap中去,如果Service中的Instance发生了变化,取出订阅了此实例的客户端列表,并通过UDP的方式进行通知。

五、小结

在naming service模块:
1、nacos-client
主要是nacos客户端的业务逻辑,与服务端的调用基本有两种方式,grpc和http,分别对应NamingGrpcClientProxy和NamingHttpClientProxy。

2、nacos-naming
nacos naming的服务端,提供对外api接口,包括对服务、实例等的查询、更新、删除等操作。

下篇计划重点介绍Grpc和UDP相关的内容。

Nacos源码系列之服务发现(二)相关推荐

  1. Nacos源码系列—关于服务注册的那些事

    点赞再看,养成习惯,微信搜索[牧小农]关注我获取更多资讯,风里雨里,小农等你,很高兴能够成为你的朋友. 项目源码地址:公众号回复 nacos,即可免费获取源码 简介 首先我们在看Nacos源码之前,要 ...

  2. Nacos源码系列——第三章(全网最经典的Nacos集群源码主线剖析)

    上两个章节讲述了Nacos在单机模式下的服务注册,发现等源码剖析过程,实战当中 其实单机是远远不够的,那么Nacos是如何在集群模式下是如何保证节点状态同步,以及服 务变动,新增数据同步的过程的!   ...

  3. Nacos源码系列——第一章(Nacos核心源码主线剖析上)

    在讲具体的源码之前,我有几点想说明下,很多开发可能觉得源码不重要,甚至觉得互联网 的知识,目前够用就可以,也不需要多么精通.的确,在大多数的公司中,你能用你的知识 解决问题就可以,不一定非要涉及到源码 ...

  4. Nacos源码系列——第二章(Nacos核心源码主线剖析下)

    上章节我这边带着大家看了下Nacos的源码,针对上节课做个总结: Nacos服务注册过程深度剖析 Nacos注册表如何防止多节点读写并发冲突 Nacos高并发支撑异步队列与内存队列剖析 Nacos心跳 ...

  5. Nacos源码系列—订阅机制的前因后果(下)

    点赞再看,养成习惯,微信搜索[牧小农]关注我获取更多资讯,风里雨里,小农等你,很高兴能够成为你的朋友. 项目源码地址:公众号回复 nacos,即可免费获取源码 事件发布 在上一节中我们讲解了在Noti ...

  6. 《一步一步看源码:Nacos》框架源码系列之一(其1,配置服务源码)

    Nacos源码 ​ 因为最近项目在做容器化处理,容器化后涉及到不同进程对同一个文件的读写,考虑到可能会存在同一文件的配置文件,可能会把彼此覆盖掉,所以这里学习一下Nacos源码. 整体结构图 ​ 这边 ...

  7. (Nacos源码解析五)Nacos服务事件变动源码解析

    Nacos源码解析系列目录 Nacos 源码编译运行 (Nacos源码解析一)Nacos 注册实例源码解析 (Nacos源码解析二)Nacos 服务发现源码解析 (Nacos源码解析三)Nacos 心 ...

  8. 源码解读_入口开始解读Vue源码系列(二)——new Vue 的故事

    作者:muwoo 转发链接:https://github.com/muwoo/blogs/blob/master/src/Vue/2.md 目录 入口开始解读Vue源码系列(一)--造物创世 入口开始 ...

  9. Spring源码系列(十二)Spring创建Bean的过程(二)

    1.写在前面 上篇博客主要Spring在创建Bean的时候,第一次调用的Bean的后置处理器的过程,同时笔者也打算将整个Spring创建的Bean的过程,通过这个系列,将Bean的创建过程给讲清楚,废 ...

  10. Android 源码系列之二十通过反射解决在HuaWei手机出现Register too many Broadcast Receivers的crash

    转载请注明出处:http://blog.csdn.net/llew2011/article/details/79054457 Android开发适配问题一直是一个让人头疼的话题,由于国内很多厂商都有对 ...

最新文章

  1. 使用第三方SDK(如微信、qq、快看、头条等),调用接口405 Method Not Allowed
  2. opencv nms 学习笔记
  3. 31 天重构学习笔记28. 为布尔方法命名
  4. mongodb studio 3t 破解无限使用脚本
  5. .jar中没有主清单属性_如何在springboot中使用PageHelper分页插件
  6. 对表头指针、表头结点,单链表删除的理解
  7. Caffe自己修改训练方法
  8. 错误率_研究发现,商业语音识别系统存在高错误率
  9. (转载)7个去伪存真的JavaScript面试题
  10. 【Java】 剑指offer(65) 不用加减乘除做加法
  11. Golang研学:defer!如何掌握并用好(延迟执行)
  12. 【Pytorch with fastai】第 1 章:你的深度学习之旅
  13. 使用YASM编程 - 06
  14. ubuntu 16.04 Titanxp 安装cuda10.0 cudnn7.6 环境
  15. android 遥控器配置文件,[转载]android万能遥控器之一--前言及发射部分的简单实现...
  16. kubectl config 命令
  17. 选项不属于HTML语言特点,JavaScript选择题
  18. MyZip Pro for Mac(专业解压缩工具)
  19. 从 java 8 到 java 17
  20. html的冒号有什么作用,vb中冒号和分号有什么作用

热门文章

  1. 逝随春花——OIer JCY 后续
  2. java 如何清除临时文件_如何删除Java中的临时文件?
  3. zabbix 参数 脚本_zabbix 自定义脚本短信报警
  4. oracle数据库interval使用,Oracle Interval类型
  5. Spring Security(14)——权限鉴定基础
  6. linux x99 测试,超频测试总结 - 技嘉X99 Phoenix SLI主板评测:综合素质爆表的主板 - 超能网...
  7. 02 shell编程之条件语句
  8. matlab 直方图均衡化、规定化 ,线性灰度级变换实现图像增强
  9. DM13: COSCon19私人回顾
  10. 帝国cms ajax,帝国CMS封装的ajax加载信息框架代码