本文主要研究一下nacos server的PushService

PushService

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java

@Component
public class PushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> {@Autowiredprivate SwitchDomain switchDomain;private ApplicationContext applicationContext;private static final long ACK_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10L);private static final int MAX_RETRY_TIMES = 1;private static volatile ConcurrentMap<String, Receiver.AckEntry> ackMap= new ConcurrentHashMap<String, Receiver.AckEntry>();private static ConcurrentMap<String, ConcurrentMap<String, PushClient>> clientMap= new ConcurrentHashMap<String, ConcurrentMap<String, PushClient>>();private static volatile ConcurrentHashMap<String, Long> udpSendTimeMap = new ConcurrentHashMap<String, Long>();public static volatile ConcurrentHashMap<String, Long> pushCostMap = new ConcurrentHashMap<String, Long>();private static int totalPush = 0;private static int failedPush = 0;private static ConcurrentHashMap<String, Long> lastPushMillisMap = new ConcurrentHashMap<>();private static DatagramSocket udpSocket;private static Map<String, Future> futureMap = new ConcurrentHashMap<>();private static ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setDaemon(true);t.setName("com.alibaba.nacos.naming.push.retransmitter");return t;}});private static ScheduledExecutorService udpSender = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setDaemon(true);t.setName("com.alibaba.nacos.naming.push.udpSender");return t;}});static {try {udpSocket = new DatagramSocket();Receiver receiver = new Receiver();Thread inThread = new Thread(receiver);inThread.setDaemon(true);inThread.setName("com.alibaba.nacos.naming.push.receiver");inThread.start();executorService.scheduleWithFixedDelay(new Runnable() {@Overridepublic void run() {try {removeClientIfZombie();} catch (Throwable e) {Loggers.PUSH.warn("[NACOS-PUSH] failed to remove client zombie");}}}, 0, 20, TimeUnit.SECONDS);} catch (SocketException e) {Loggers.SRV_LOG.error("[NACOS-PUSH] failed to init push service");}}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}//......public static void removeClientIfZombie() {int size = 0;for (Map.Entry<String, ConcurrentMap<String, PushClient>> entry : clientMap.entrySet()) {ConcurrentMap<String, PushClient> clientConcurrentMap = entry.getValue();for (Map.Entry<String, PushClient> entry1 : clientConcurrentMap.entrySet()) {PushClient client = entry1.getValue();if (client.zombie()) {clientConcurrentMap.remove(entry1.getKey());}}size += clientConcurrentMap.size();}if (Loggers.PUSH.isDebugEnabled()) {Loggers.PUSH.debug("[NACOS-PUSH] clientMap size: {}", size);}}//......
}
  • PushService实现了ApplicationContextAware、ApplicationListener接口;它有两个ScheduledExecutorService,一个用于retransmitter,一个用于udpSender;其static代码块创建了一个deamon线程执行Receiver,同时注册了一个定时任务执行removeClientIfZombie,它会遍历clientMap,移除zombie的client

Receiver

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java

public static class Receiver implements Runnable {@Overridepublic void run() {while (true) {byte[] buffer = new byte[1024 * 64];DatagramPacket packet = new DatagramPacket(buffer, buffer.length);try {udpSocket.receive(packet);String json = new String(packet.getData(), 0, packet.getLength(), Charset.forName("UTF-8")).trim();AckPacket ackPacket = JSON.parseObject(json, AckPacket.class);InetSocketAddress socketAddress = (InetSocketAddress) packet.getSocketAddress();String ip = socketAddress.getAddress().getHostAddress();int port = socketAddress.getPort();if (System.nanoTime() - ackPacket.lastRefTime > ACK_TIMEOUT_NANOS) {Loggers.PUSH.warn("ack takes too long from {} ack json: {}", packet.getSocketAddress(), json);}String ackKey = getACKKey(ip, port, ackPacket.lastRefTime);AckEntry ackEntry = ackMap.remove(ackKey);if (ackEntry == null) {throw new IllegalStateException("unable to find ackEntry for key: " + ackKey+ ", ack json: " + json);}long pushCost = System.currentTimeMillis() - udpSendTimeMap.get(ackKey);Loggers.PUSH.info("received ack: {} from: {}:, cost: {} ms, unacked: {}, total push: {}",json, ip, port, pushCost, ackMap.size(), totalPush);pushCostMap.put(ackKey, pushCost);udpSendTimeMap.remove(ackKey);} catch (Throwable e) {Loggers.PUSH.error("[NACOS-PUSH] error while receiving ack data", e);}}}//......public static class AckPacket {public String type;public long lastRefTime;public String data;}
}
  • Receiver实现了Runnable接口,其run方法使用while true循环来执行udpSocket.receive,之后解析AckPacket,从ackMap移除该ackKey,更新pushCostMap,同时从udpSendTimeMap移除该ackKey

PushClient

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java

   public class PushClient {private String namespaceId;private String serviceName;private String clusters;private String agent;private String tenant;private String app;private InetSocketAddress socketAddr;private DataSource dataSource;private Map<String, String[]> params;public Map<String, String[]> getParams() {return params;}public void setParams(Map<String, String[]> params) {this.params = params;}public long lastRefTime = System.currentTimeMillis();public PushClient(String namespaceId,String serviceName,String clusters,String agent,InetSocketAddress socketAddr,DataSource dataSource,String tenant,String app) {this.namespaceId = namespaceId;this.serviceName = serviceName;this.clusters = clusters;this.agent = agent;this.socketAddr = socketAddr;this.dataSource = dataSource;this.tenant = tenant;this.app = app;}public DataSource getDataSource() {return dataSource;}public PushClient(InetSocketAddress socketAddr) {this.socketAddr = socketAddr;}public boolean zombie() {return System.currentTimeMillis() - lastRefTime > switchDomain.getPushCacheMillis(serviceName);}@Overridepublic String toString() {return "serviceName: " + serviceName+ ", clusters: " + clusters+ ", ip: " + socketAddr.getAddress().getHostAddress()+ ", port: " + socketAddr.getPort()+ ", agent: " + agent;}public String getAgent() {return agent;}public String getAddrStr() {return socketAddr.getAddress().getHostAddress() + ":" + socketAddr.getPort();}public String getIp() {return socketAddr.getAddress().getHostAddress();}@Overridepublic int hashCode() {return Objects.hash(serviceName, clusters, socketAddr);}@Overridepublic boolean equals(Object obj) {if (!(obj instanceof PushClient)) {return false;}PushClient other = (PushClient) obj;return serviceName.equals(other.serviceName) && clusters.equals(other.clusters) && socketAddr.equals(other.socketAddr);}public String getClusters() {return clusters;}public void setClusters(String clusters) {this.clusters = clusters;}public String getNamespaceId() {return namespaceId;}public void setNamespaceId(String namespaceId) {this.namespaceId = namespaceId;}public String getServiceName() {return serviceName;}public void setServiceName(String serviceName) {this.serviceName = serviceName;}public String getTenant() {return tenant;}public void setTenant(String tenant) {this.tenant = tenant;}public String getApp() {return app;}public void setApp(String app) {this.app = app;}public InetSocketAddress getSocketAddr() {return socketAddr;}public void refresh() {lastRefTime = System.currentTimeMillis();}}
  • PushClient封装了要推送的目标服务地址等信息,它提供了zombie方法来判断目标服务是否zombie,它判断距离lastRefTime的时间差是否超过switchDomain指定的该serviceName的PushCacheMillis(默认为10秒),超过则判定为zombie

PushService.onApplicationEvent

@Component
public class PushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> {//......@Overridepublic void onApplicationEvent(ServiceChangeEvent event) {Service service = event.getService();String serviceName = service.getName();String namespaceId = service.getNamespaceId();Future future = udpSender.schedule(new Runnable() {@Overridepublic void run() {try {Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");ConcurrentMap<String, PushClient> clients = clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));if (MapUtils.isEmpty(clients)) {return;}Map<String, Object> cache = new HashMap<>(16);long lastRefTime = System.nanoTime();for (PushClient client : clients.values()) {if (client.zombie()) {Loggers.PUSH.debug("client is zombie: " + client.toString());clients.remove(client.toString());Loggers.PUSH.debug("client is zombie: " + client.toString());continue;}Receiver.AckEntry ackEntry;Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());byte[] compressData = null;Map<String, Object> data = null;if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);compressData = (byte[]) (pair.getValue0());data = (Map<String, Object>) pair.getValue1();Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());}if (compressData != null) {ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);} else {ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);if (ackEntry != null) {cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));}}Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",client.getServiceName(), client.getAddrStr(), client.getAgent(), (ackEntry == null ? null : ackEntry.key));udpPush(ackEntry);}} catch (Exception e) {Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);} finally {futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));}}}, 1000, TimeUnit.MILLISECONDS);futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);}//......public void serviceChanged(Service service) {// merge some change events to reduce the push frequency:if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) {return;}this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));}//......
}
  • onApplicationEvent会处理ServiceChangeEvent,它会注册一个延时任务并将该future放入futureMap;该延时任务会从clientMap获取指定namespaceId, serviceName的clients;然后遍历clients判断是否是zombie,如果是的话则移除该client,否则创建Receiver.AckEntry,然后执行udpPush(ackEntry),最后从futureMap移除该future;serviceChanged方法提供给外部调用发布ServiceChangeEvent

PushService.udpPush

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java

@Component
public class PushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> {//......public static class Receiver implements Runnable {//......public static class AckEntry {public AckEntry(String key, DatagramPacket packet) {this.key = key;this.origin = packet;}public void increaseRetryTime() {retryTimes.incrementAndGet();}public int getRetryTimes() {return retryTimes.get();}public String key;public DatagramPacket origin;private AtomicInteger retryTimes = new AtomicInteger(0);public Map<String, Object> data;}//......} private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {if (ackEntry == null) {Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");return null;}if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);ackMap.remove(ackEntry.key);udpSendTimeMap.remove(ackEntry.key);failedPush += 1;return ackEntry;}try {if (!ackMap.containsKey(ackEntry.key)) {totalPush++;}ackMap.put(ackEntry.key, ackEntry);udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());Loggers.PUSH.info("send udp packet: " + ackEntry.key);udpSocket.send(ackEntry.origin);ackEntry.increaseRetryTime();executorService.schedule(new Retransmitter(ackEntry), TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS),TimeUnit.MILLISECONDS);return ackEntry;} catch (Exception e) {Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}",ackEntry.data, ackEntry.origin.getAddress().getHostAddress(), e);ackMap.remove(ackEntry.key);udpSendTimeMap.remove(ackEntry.key);failedPush += 1;return null;}}//......
}
  • udpPush方法会根据Receiver.AckEntry的信息进行判断,如果其重试次数大于MAX_RETRY_TIMES则终止push,将其从ackMap、udpSendTimeMap中移除;如果可以重试则将其ackEntry.key放入ackMap及udpSendTimeMap,然后执行udpSocket.send(ackEntry.origin)及ackEntry.increaseRetryTime(),并注册Retransmitter的延时任务;如果出现异常则将其从ackMap、udpSendTimeMap移除

Retransmitter

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java

public static class Retransmitter implements Runnable {Receiver.AckEntry ackEntry;public Retransmitter(Receiver.AckEntry ackEntry) {this.ackEntry = ackEntry;}@Overridepublic void run() {if (ackMap.containsKey(ackEntry.key)) {Loggers.PUSH.info("retry to push data, key: " + ackEntry.key);udpPush(ackEntry);}}
}
  • Retransmitter实现了Runnable方法,其run方法在ackMap包含ackEntry.key的条件下执行udpPush重试

小结

  • PushService实现了ApplicationContextAware、ApplicationListener接口
  • 其static代码块创建了一个deamon线程执行Receiver,同时注册了一个定时任务执行removeClientIfZombie,它会遍历clientMap,移除zombie的client
  • 其onApplicationEvent会处理ServiceChangeEvent,它会注册一个延时任务并将该future放入futureMap;该延时任务会从clientMap获取指定namespaceId, serviceName的clients;然后遍历clients判断是否是zombie,如果是的话则移除该client,否则创建Receiver.AckEntry,然后执行udpPush(ackEntry),最后从futureMap移除该future;serviceChanged方法提供给外部调用发布ServiceChangeEvent

doc

  • PushService

(想自学习编程的小伙伴请搜索圈T社区,更多行业相关资讯更有行业相关免费视频教程。完全免费哦!)

聊聊nacos server的PushService相关推荐

  1. server push java_聊聊nacos server的PushService

    序 本文主要研究一下nacos server的PushService PushService nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/na ...

  2. 问君能有几多愁,恰似不懂Linux SQL如何调优——聊聊SQL Server on Linux最佳实践

    问君能有几多愁,恰似不懂Linux SQL如何调优--聊聊SQL Server on Linux最佳实践 自从微软开始拥抱Linux, SQL Server 很快就推出了 Linux版本, linux ...

  3. Nacos Server did not start because dumpservice bean construction failure : No DataSource set异常解决

    在本地搭建Nacos+Mysql,启动Nacos的时候,发现一直报"Nacos Server did not start because dumpservice bean construct ...

  4. 安装Nacos Server

    1.预备环境准备 Nacos依赖java环境来运行.如果您是从代码开始构建并运行Nacos,还需要为此配置maven环境,请确保是在以下版本环境中安装使用; 64 bit OS Linux/Unix/ ...

  5. 聊聊Eureka Server的REST API

    序 本文主要研究下Eureka Server的REST API ApplicationsResource eureka-core-1.8.8-sources.jar!/com/netflix/eure ...

  6. nacos server 安装报错 macOS 10.12.6

    如题,系统版本macOS 10.12.6,起初选的官网教程的nacos-server-2.1.1版本启动的,报错 报错信息 Caused by: org.springframework.beans.B ...

  7. nacos server默认的登录账号密码

    默认的账号密码都是 nacos 登录成功后,如下:

  8. nacos server 1.3 网盘地址

    链接: https://pan.baidu.com/s/13aHevI7_XNo8_PSI4S1uFA 提取码: cr9g

  9. sql server配置管理器在哪里看ip_微服务管理平台nacos虚拟ip负载均衡集群模式搭建...

    一.Nacos简介 Nacos是用于微服务管理的平台,其核心功能是服务注册与发现.服务配置管理. Nacos作为服务注册发现组件,可以替换Spring Cloud应用中传统的服务注册于发现组件,如:E ...

  10. nacos启动失败:org.springframework.boot.web.server.WebServerExceptio

    准备环境 系统环境: windows nacos: 2.0.0-BETA 错误信息 org.springframework.context.ApplicationContextException: U ...

最新文章

  1. matlab去除图片水印_Python利用OpenCV去除图片水印
  2. Oracle技术之Data Pump介绍
  3. session的常用方法。
  4. linux 信号传递函数,Linux Shell之七 函数应用
  5. Spring Cloud【Finchley】-08使用Hystrix实现容错
  6. buntu 8.04 安装 mysql 无提示输入用户名 密码 ,怎么修改
  7. 唐尼vr眼镜好吗_科普:眼镜片有哪些种类?镜片越薄越好吗?无框眼镜配什么材质?...
  8. 小氓男-灰色按钮激活V1.0
  9. python搭建selenium_了解搭建selenium安装以及配置环境
  10. AtCoder Beginner Contest 211 E - Red Polyomino(暴力+状态记录)
  11. [转]SQL语句资料
  12. 如何在SQL Server 2019中添加数据敏感度分类的命令
  13. Linux之父:开源不为金钱名利只为娱乐
  14. WordPress无其他语言解决方法
  15. 使用Builder模式创建复杂可选参数对象
  16. 【渝粤教育】国家开放大学2018年春季 0064-21T20世纪欧美文学 参考试题
  17. 【转载】对SVM的个人理解---浅显易懂
  18. 8.声卡驱动06-自己实现alsa驱动-虚拟声卡-widget
  19. 掌握到胃-奈氏图与伯德图的绘制
  20. python画海绵宝宝代码_Python_Turtle库画一只派大星

热门文章

  1. 【JZOJ 6080】【GDOI2019模拟2019.3.23】IOer
  2. 【波导】——理解群速度和相速度
  3. MySQL的函数-窗口函数
  4. 1433端口被运营商封锁的解决方法
  5. java文字竖排_Java输出竖排文字
  6. 页面自动添加font标签
  7. HTML5录制音频文件
  8. 580刷590bios_身价瞬间涨几百 RX 480刷bios变身RX 580
  9. 分区助手扩大c盘后自动修复_怎么使用扩展分区向导来扩大C盘
  10. 省考计算机专业课考什么,计算机考研专业课考什么