本文分析Nacos基于Nacos 2.0

Nacos中服务注册中心默认是AP模式,如果设置为CP模式
那么客户端设置 spring.cloud.nacos.discovery.ephemeral=false (默认为true) ,表示是启用AP模式

接下来我们看看Nacos中对于AP、CP模式是怎么实现的。

首先说明一下,在Nacos中默认基于HTTP的端口号是8848 ,Nacos2.0增加了gRPC,而gRPC有两个地方,一个是和客户端通信,一个是集群节点之间的通信。 Nacos中gRPC的端口号都是基于HTTP端口进行一定的漂移,客户端通信端口是漂移1000,即默认为:8848+1000=9848,而集群之间的通信端口漂移 1001,即8848+·1001=9849,这里需要注意如果网络安全需要开启相关端口的话,那么这里需要开通,8848,9848,9849,三个端口号

AP模式

在Nacos服务注册,即naming服务中,对于AP模式,是采用gRPC通信的,而协议则是自己实现了一个名为Distro协议。我们看下客户端是怎么进行服务注册的,客户端进行服务注册主要是通过NacosNamingService来实现的:

    public void registerInstance(String serviceName, String groupName, String ip, int port, String clusterName)throws NacosException {Instance instance = new Instance();instance.setIp(ip);instance.setPort(port);instance.setWeight(1.0);instance.setClusterName(clusterName);registerInstance(serviceName, groupName, instance);}public void registerInstance(String serviceName, Instance instance) throws NacosException {registerInstance(serviceName, Constants.DEFAULT_GROUP, instance);}public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {NamingUtils.checkInstanceIsLegal(instance);clientProxy.registerService(serviceName, groupName, instance);}

而这里最终会通过clientProxy去进行服务注册,实现为NamingClientProxyDelegate:

 public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);}
private NamingClientProxy getExecuteClientProxy(Instance instance) {return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;}

可以看到在NamingClientProxyDelegate中会判断注册的服务的实例是否是临时的,如果是临时的则用gRPCClient否则httpClient请求
我们知道,默认instance.isEphemeral=true即是临时的,应采用gRPCClient去进行服务注册。
这里gRPCClient实现为NamingGrpcClientProxy:

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName,instance);InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,NamingRemoteConstants.REGISTER_INSTANCE, instance);requestToServer(request, Response.class);namingGrpcConnectionEventListener.cacheInstanceForRedo(serviceName, groupName, instance);}

最后通过gRPC向服务端发送了一个InstanceRequest请求。
到这里,客户端就完成了服务注册请求的发送,接下来看看服务端怎么处理的。
服务端gRPC实现都是继承BaseGrpcServer,其子类主要是不同线程池的选择,其中GrpcSdkServer用来处理和客户端之间的通信,GrpcClusterServer用来集群节点之间的通信
而相关请求在GrpcRequestAcceptor根据不同请求类型获取RequestHandlerRegistry对应的RequestHandler进行处理。

这里服务注册客户端发送的请求类型为InstanceRequest,请求将交由InstanceRequestHandler处理:

public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {Service service = Service.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);switch (request.getType()) {case NamingRemoteConstants.REGISTER_INSTANCE:return registerInstance(service, request, meta);case NamingRemoteConstants.DE_REGISTER_INSTANCE:return deregisterInstance(service, request, meta);default:throw new NacosException(NacosException.INVALID_PARAM,String.format("Unsupported request type %s", request.getType()));}}
private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) {clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);}

可以看到,InstanceRequestHandler将会处理服务注册和服务下线两个处理,而这里实际处理会交给AP模式的实现EphemeralClientOperationServiceImpl:

public void registerInstance(Service service, Instance instance, String clientId) {Service singleton = ServiceManager.getInstance().getSingleton(service);Client client = clientManager.getClient(clientId);InstancePublishInfo instanceInfo = getPublishInfo(instance);client.addServiceInstance(singleton, instanceInfo);client.setLastUpdatedTime();NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));}

这里可以看到,在AP模式下,首先就会将注册的实例信息通过clientManager获取到对应的Client信息,直接写入到Client中,而这里写入的则是一个InstancePublishInfo的信息,后续会通过ClientManager能够获取到各个客户端节点发布的服务信息

这里比较需要注意的是client.addServiceInstance(singleton, instanceInfo);,在Nacos中AP是将服务信息放入到了client和ServiceManager中。而client的addServiceInstance实现如下:

public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {if (null == publishers.put(service, instancePublishInfo)) {MetricsMonitor.incrementInstanceCount();}NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());return true;}

这里会发布ClientEvent.ClientChangedEvent事件,然后会异步的将注册的服务信息同步和集群其他节点,而这个事件的处理在DistroClientDataProcessor:

public void onEvent(Event event) {if (EnvUtil.getStandaloneMode()) {return;}if (!upgradeJudgement.isUseGrpcFeatures()) {return;}if (event instanceof ClientEvent.ClientVerifyFailedEvent) {syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);} else {syncToAllServer((ClientEvent) event);}}

最终会走syncToAllServer的逻辑:

// DistroClientDataProcessor.java
private void syncToAllServer(ClientEvent event) {Client client = event.getClient();if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {return;}if (event instanceof ClientEvent.ClientDisconnectEvent) {DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.DELETE);} else if (event instanceof ClientEvent.ClientChangedEvent) {DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.CHANGE);}}
// DistroProtocol.java
public void sync(DistroKey distroKey, DataOperation action, long delay) {for (Member each : memberManager.allMembersWithoutSelf()) {syncToTarget(distroKey, action, each.getAddress(), delay);}}

具体实现则是在DistroProtocol中,则是一个Distro协议的实现,这里最后是封装成一个DistroSyncChangeTask,实现了Runnable接口:

public void run() {String type = getDistroKey().getResourceType();DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(type);if (null == transportAgent) {Loggers.DISTRO.warn("No found transport agent for type [{}]", type);return;}Loggers.DISTRO.info("[DISTRO-START] {}", toString());if (transportAgent.supportCallbackTransport()) {doExecuteWithCallback(new DistroExecuteCallback());} else {executeDistroTask();}}private void executeDistroTask() {try {boolean result = doExecute();if (!result) {handleFailedTask();}Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);} catch (Exception e) {handleFailedTask();}}

我们看看doExecute:

protected boolean doExecute() {String type = getDistroKey().getResourceType();DistroData distroData = getDistroData(type);if (null == distroData) {Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());return true;}return getDistroComponentHolder().findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());}

这里会获取需要同步的数据,通过getDistroData,而最终会调用DistroClientDataProcessor.getDistroData:

public DistroData getDistroData(DistroKey distroKey) {Client client = clientManager.getClient(distroKey.getResourceKey());if (null == client) {return null;}byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(client.generateSyncData());return new DistroData(distroKey, data);}

通过clientManager获取到了该客户端节点注册的实例的信息。
另外一个就是在NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));发布该事件的时候,会调用ClientServiceIndexesManager.onEvent方法,添加到ClientServiceIndexesManager.publisherIndexes 这是一个Map<Service, Set<String>>结构,记录了一个服务的所有提供方,然后会发布ServiceEvent.ServiceChangedEvent事件,这个事件最后会在NamingSubscriberServiceV2Impl.onEvent中处理,会通过push方式(gRPC)将新上线的服务信息推送给消费方

另外在服务注册的时候发布ClientOperationEvent.ClientRegisterServiceEvent的时候也会在ClientServiceIndexesManager中处理该事件:

private void addPublisherIndexes(Service service, String clientId) {publisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());publisherIndexes.get(service).add(clientId);NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));}

可以看到,处理很简单就是将发布的服务信息和对应的客户端节点放入到了一个ConcurrentMap<Service, Set<String>>中,这样
后面需要获取某个服务的所有的实例的时候,通过ClientServiceIndexesManager和Service能够获取到所有的该服务实例的clientId,然后通过ClientManager和clientId集合能够获取到该服务所有实例节点信息

CP模式

对于CP模式来说,Nacos2使用了Raft协议,在Nacos2中则是使用阿里开源的SOFA-Jraft来实现Raft协议,想了解的可以看这篇 Raft算法实现 - Sofa-JRaft,选主,数据写入,日志复制

客户端则是通过NamingHttpClientProxy模式发送相关请求。

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);if (instance.isEphemeral()) {BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);beatReactor.addBeatInfo(groupedServiceName, beatInfo);}final Map<String, String> params = new HashMap<String, String>(16);params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, groupedServiceName);params.put(CommonParams.GROUP_NAME, groupName);params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());params.put("ip", instance.getIp());params.put("port", String.valueOf(instance.getPort()));params.put("weight", String.valueOf(instance.getWeight()));params.put("enable", String.valueOf(instance.isEnabled()));params.put("healthy", String.valueOf(instance.isHealthy()));params.put("ephemeral", String.valueOf(instance.isEphemeral()));params.put("metadata", JacksonUtils.toJson(instance.getMetadata())); reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);}

在服务端则是InstanceController进行处理:

@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)public String register(HttpServletRequest request) throws Exception {final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);NamingUtils.checkServiceNameFormat(serviceName);final Instance instance = parseInstance(request);getInstanceOperator().registerInstance(namespaceId, serviceName, instance);return "ok";}

最终在PersistentServiceProcessor.put中进行持久化:

public void put(String key, Record value) throws NacosException {final BatchWriteRequest req = new BatchWriteRequest();Datum datum = Datum.createDatum(key, value);req.append(ByteUtils.toBytes(key), serializer.serialize(datum));final WriteRequest request = WriteRequest.newBuilder().setData(ByteString.copyFrom(serializer.serialize(req))).setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP).setOperation(Op.Write.desc).build();try {protocol.write(request);} catch (Exception e) {throw new NacosException(ErrorCode.ProtoSubmitError.getCode(), e.getMessage());}}

这里的protocol实现为JRaftProtocol:

// JRaftProtocol.java
public Response write(WriteRequest request) throws Exception {CompletableFuture<Response> future = writeAsync(request);return future.get(10_000L, TimeUnit.MILLISECONDS);}
// JRaftServer.java
public CompletableFuture<Response> commit(final String group, final Message data,final CompletableFuture<Response> future) {LoggerUtils.printIfDebugEnabled(Loggers.RAFT, "data requested this time : {}", data);final RaftGroupTuple tuple = findTupleByGroup(group);if (tuple == null) {future.completeExceptionally(new IllegalArgumentException("No corresponding Raft Group found : " + group));return future;}FailoverClosureImpl closure = new FailoverClosureImpl(future);final Node node = tuple.node;if (node.isLeader()) {applyOperation(node, data, closure);} else {invokeToLeader(group, data, rpcRequestTimeoutMs, closure);}return future;}

可以看到,这里如果当前节点不是Leader节点, 则当前节点会查找Leader节点,然后将请求转发给Leader节点进行数据的写入。
在Nacos的Raft状态机实现为NacosStateMachine
当我们将服务注册的信息成功写入Raft集群过半节点之后,会触发NacosStateMachine.onApply方法,核心逻辑主要如下:

if (message instanceof WriteRequest) {Response response = processor.onApply((WriteRequest) message);postProcessor(response, closure);}if (message instanceof ReadRequest) {Response response = processor.onRequest((ReadRequest) message);postProcessor(response, closure);}

最终会调用PersistentClientOperationServiceImpl.onApply方法:

private void onInstanceRegister(Service service, Instance instance, String clientId) {Service singleton = ServiceManager.getInstance().getSingleton(service);Client client = clientManager.computeIfAbsent(clientId, () -> new IpPortBasedClient(clientId, false));InstancePublishInfo instancePublishInfo = getPublishInfo(instance);client.addServiceInstance(singleton, instancePublishInfo);client.setLastUpdatedTime();NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));}

而这块的代码则是和AP模式的基本类似,这里在 client.addServiceInstance逻辑中会发布相关事件,通知服务消费端服务的变更信息,和AP模式一样也是通过PushExecutorDelegate代理来选择通过什么方式推送给消费客户端,而选择的逻辑则是判断建立连接的clientId是否包含"#",如果包含则采用UDP的实现方式:PushExecutorUdpImpl 。对于CP模式下采用HTTP请求,其clientId通过如下方式拼接:address + "#" + ephemeral
因此,CP模式下采用UDP来通知对应的服务消费端,而在服务消费客户端则有一个对应的PushReceiver来接收服务端返回的信息,这是一个Runnable实现类,run方法逻辑如下:

public void run() {while (!closed) {try {byte[] buffer = new byte[UDP_MSS];DatagramPacket packet = new DatagramPacket(buffer, buffer.length);udpSocket.receive(packet);String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);String ack;if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {serviceInfoHolder.processServiceInfo(pushPacket.data);// send ack to serverack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"+ "\"\"}";} else if ("dump".equals(pushPacket.type)) {// dump data to serverack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"+ "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(serviceInfoHolder.getServiceInfoMap()))+ "\"}";} else {// do nothing send ack onlyack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime+ "\", \"data\":" + "\"\"}";}udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,packet.getSocketAddress()));} catch (Exception e) {if (closed) {return;}}}}

如果有节点或者服务变化,会调用serviceInfoHolder.processServiceInfo(pushPacket.data);进行处理,另外,这里的UDP端口号取值逻辑为: 如果配置了系统变量push.receiver.udp.port那么取该值,否则随机生成,PushReceiver则会在不断监听来自UDP信息并处理。

另外,在CP模式下,客户端在注册服务信息的同时,对于每个服务会启动一个心跳服务,默认每隔5S时间会发送服务实例的心跳信息。

总结一下,在Nacos2服务发现的的使用场景下,使用CP模式还是AP模式主要是看服务提供客户端选择,如果服务注册指定为临时,那么走AP模式,否则走CP模式。

而对于服务消费客户端来说,订阅服务的的时候都是通过gRPC发布订阅,在消费服务客户端启动的时候同时启动了gRPC和HTTP两种通信方式,在HTTP冲初始化了PushReceiver也就是UDP监听方式,如果服务是AP模式,那么当服务提供方发生变动的时候则通过gRPC来通知服务消费方,如果是CP模式,那么通过UDP方式来通知消费方。
可以看到在管理服务注册信息时,CP模式与APM模式基本一样的,只不过AP模式上来就写入,而CP模式则是必须通过Raft算法集群过半节点写入成功之后才写入。

另外Nacos中不管CP还是AP模式获取服务注册成功都会向ClientServiceIndexesManagerClientManager写入信息,需要获取某个服务的所有的实例的时候,通过ClientServiceIndexesManager和Service能够获取到所有的该服务实例的clientId,然后通过ClientManager和clientId集合能够获取到该服务所有实例节点信息`

Nacos中服务注册中心AP、CP模式实现,AP、CP模式切换相关推荐

  1. Spring Cloud Alibaba教程:使用Nacos作为服务注册中心

    点击上方"方志朋",选择"置顶公众号" 技术文章第一时间送达! 什么是Nacos? Nacos 致力于帮助您发现.配置和管理微服务.Nacos 提供了一组简单易 ...

  2. OpenFeign组件的使用(使用nacos作为服务注册中心)

    一.OpenFeign介绍 Feign是一个声明式的伪Http客户端,它使得写Http客户端变得更简单.使用Feign,只需要创建一个接口并注解.它具有可插拔的注解特性(可以使用springmvc的注 ...

  3. eureka服务注册中心集群模式创建

    Eureka组件是springcloud提供的服务注册与发现中心组件 这里创建Eureka server 端的一个集群环境 Eureka服务端运行依赖于spring工程,其集群环境搭建,需要创建spr ...

  4. Nacos作为服务注册中心及负载均衡、服务流量权重设置

    如果我的博客对你有帮助,欢迎进行评论✏️✏️.点赞

  5. 无敌的服务注册中心Spring CloudAlibaba Nacos不进来看一看吗?

    目录​​​​​​​ 1.Nacos概述 2.Nacos安装运行 安装 运行 3.Nacos作为服务注册中心 服务提供者order-service 服务消费者user-service 4.Nacos作为 ...

  6. java 服务注册中心_服务治理的含义和java最流行的微服务框架服务治理注册中心的搭建...

    原标题:服务治理的含义和java最流行的微服务框架服务治理注册中心的搭建 Spring Cloud Eureka基于Netflix Eureka做了二次封装,是Spring Cloud Netflix ...

  7. 服务注册中心AP和CP区别【Nacos|Eureka|Consul|Zookeeper】

    当下,分布式系统正变得越来越重要,大型网站几乎都是分布式的.分布式系统的最大难点,就是各个节点的状态如何同步.CAP 定理是这方面的基本定理,也是理解分布式系统的起点. CAP定理,又被称作布鲁尔定理 ...

  8. 分布式服务框架Dubbo集成Nacos框架实现注册中心

    在讨论Nacos之前,我们先讨论一下CAP理论 CAP理论是分布式场景绕不开的重要理论 一致性:所有节点在同一时间具有一样的数据: 可用性:保证每个请求不管成功还是失败都有响应: 分区容忍性:系统中任 ...

  9. 微服务2——服务的注册,调用(Nacos服务注册中心+服务调用+调用负载均衡)sca-comsumersca-provider

    一.Nacos的安装和构建  以及启动 其官网地址如下: Nacos官网 1.安装前提: 第一:确保你电脑已配置JAVA_HOME环境变量(Nacos启动时需要),例如: 第二:确保你的MySQL版本 ...

最新文章

  1. Python OpenCV GrabCut进行前景分割和提取
  2. 远程桌面怎么持续连接_如何拥有成功且可持续的远程产品管理职业
  3. ubuntu12.04 启动n卡独显方法
  4. CodeForces - 1327D Infinite Path(图论综合)
  5. ASP.NET Core中使用表达式树创建URL
  6. python练习:科赫小雪花包裹
  7. android编程多组件布局,Android把多个控件定义成一个整体的布局类使用
  8. html5 push api,HTML5+ API Reference
  9. PowerShell 笔记
  10. 《中国人工智能学会通讯》——6.7 实体链接任务及系统
  11. 这个春天我能感觉的到
  12. python的Singleton模式实现
  13. 混沌系统与复杂网络控制,神经网络模型求最优解
  14. Quartus-II之D触发器
  15. 腾讯测试王者荣耀网速的软件,4G用户怎么体验5G速度 腾讯手机管家5G测速帮你一键搞定...
  16. Android中指南针的实现
  17. 在Linux中查找和删除重复文件的4种方法
  18. 在vue中使用web3.js开发以太坊dapp
  19. 华为交换机难点学习:导出配置文件/同步时间
  20. [Unity]VRTK V4的导入和使用

热门文章

  1. C++引用的定义和使用
  2. 一本通2073:【例2.16 】三角形面积
  3. 社会杂谈:救助人与救助动物,是单选题吗?“虐狗案”
  4. 自恢复保险丝PPTC
  5. acm:::2103 铺地砖
  6. java的scope_spring中@Scope作用域的注解
  7. DHCP/DNS服务器配置与管理——2
  8. 用分布式锁来防止库存超卖,但是是每秒上千订单的高并发场景,如何对分布式锁进行高并发优化来应对这个场景?
  9. 云起实验室:基于Ubuntu搭建个人网盘
  10. Python画小仓鼠