动态线程池框架Hippo4j源码解析

项目简介

Hippo-4J 通过对 JDK 线程池增强,以及扩展三方框架底层线程池等功能,为业务系统提高线上运行保障能力。

快速开始

https://hippo4j.cn/docs/user_docs/user_guide/quick-start/

源码分析

客户端

  1. 客户端启动,会创建两个线程池,message-consumemessage-produce。(以hippo4j-spring-boot-starter-example 模块为例子)
    @Bean@DynamicThreadPoolpublic Executor messageConsumeTtlDynamicThreadPool() {String threadPoolId = MESSAGE_CONSUME;ThreadPoolExecutor customExecutor = ThreadPoolBuilder.builder().dynamicPool().threadFactory(threadPoolId).threadPoolId(threadPoolId).executeTimeOut(800L).waitForTasksToCompleteOnShutdown(true).awaitTerminationMillis(5000L).taskDecorator(new TaskTraceBuilderHandler()).build();// Ali ttl adaptation use case.Executor ttlExecutor = TtlExecutors.getTtlExecutor(customExecutor);return ttlExecutor;}@SpringDynamicThreadPoolpublic ThreadPoolExecutor messageProduceDynamicThreadPool() {return ThreadPoolBuilder.buildDynamicPoolById(MESSAGE_PRODUCE);}
  1. DynamicThreadPoolPostProcessor实现了BeanPostProcessor,启动会执行DynamicThreadPoolPostProcessor#postProcessAfterInitialization,会给每个动态线程池加载listener,ClientWorker#addTenantListeners。当出现变更,执行CacheData#safeNotifyListener
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {if (bean instanceof DynamicThreadPoolExecutor || DynamicThreadPoolAdapterChoose.match(bean)) {DynamicThreadPool dynamicThreadPool;try {dynamicThreadPool = ApplicationContextHolder.findAnnotationOnBean(beanName, DynamicThreadPool.class);if (Objects.isNull(dynamicThreadPool)) {// Adapt to lower versions of SpringBoot.dynamicThreadPool = DynamicThreadPoolAnnotationUtil.findAnnotationOnBean(beanName, DynamicThreadPool.class);if (Objects.isNull(dynamicThreadPool)) {return bean;}}} catch (Exception ex) {log.error("Failed to create dynamic thread pool in annotation mode.", ex);return bean;}DynamicThreadPoolExecutor dynamicThreadPoolExecutor;if ((dynamicThreadPoolExecutor = DynamicThreadPoolAdapterChoose.unwrap(bean)) == null) {dynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) bean;}DynamicThreadPoolWrapper dynamicThreadPoolWrapper = new DynamicThreadPoolWrapper(dynamicThreadPoolExecutor.getThreadPoolId(), dynamicThreadPoolExecutor);ThreadPoolExecutor remoteThreadPoolExecutor = fillPoolAndRegister(dynamicThreadPoolWrapper);DynamicThreadPoolAdapterChoose.replace(bean, remoteThreadPoolExecutor);subscribeConfig(dynamicThreadPoolWrapper);return DynamicThreadPoolAdapterChoose.match(bean) ? bean : remoteThreadPoolExecutor;}if (bean instanceof DynamicThreadPoolWrapper) {DynamicThreadPoolWrapper dynamicThreadPoolWrapper = (DynamicThreadPoolWrapper) bean;registerAndSubscribe(dynamicThreadPoolWrapper);}return bean;}
  1. DiscoveryClient会进行注册,并且定时发送心跳。
    public DiscoveryClient(HttpAgent httpAgent, InstanceInfo instanceInfo) {this.httpAgent = httpAgent;this.instanceInfo = instanceInfo;this.appPathIdentifier = instanceInfo.getAppName().toUpperCase() + "/" + instanceInfo.getInstanceId();this.scheduler = new ScheduledThreadPoolExecutor(new Integer(1),ThreadFactoryBuilder.builder().daemon(true).prefix("client.discovery.scheduler").build());register();// Init the schedule tasks.initScheduledTasks();}
  1. 客户端启动的时候,DynamicThreadPoolAutoConfiguration会进行初始化ClientWorker。在IdentifyUtil静态方法块里面执行了DynamicThreadPoolServiceLoader.register(ClientNetworkService.class);,该方法的作用就是根据类名加载SPI,构造实体类存放到DynamicThreadPoolServiceLoader#SERVICES。获取到自定义的网络节点,组装成字符串IDENTIFY。
    static {DynamicThreadPoolServiceLoader.register(ClientNetworkService.class);}
  1. ClientWorker构造方法中,会开启线程。会调用/hippo4j/v1/cs/configs/listener,该方法和apollo的方法如出一辙,都是长轮询。用来监听服务器的修改操作。
executor.schedule(() -> {try {awaitApplicationComplete.await();executorService.execute(new LongPollingRunnable(cacheMap.isEmpty(), cacheCondition));} catch (Throwable ex) {log.error("Sub check rotate check error.", ex);}}, 1L, TimeUnit.MILLISECONDS);
  1. 当配置变更,客户端会执行CacheData#safeNotifyListener。根据线程池id获取线程池实例,GlobalThreadPoolManage.getExecutorService(threadPoolId).getExecutor()ServerThreadPoolDynamicRefresh#changePoolInfo用来修改线程池信息,而Hippo4jBaseSendMessageService#sendChangeMessage是用来发送线程池变更的消息的,需要配置。

服务端

  1. 服务端处理客户端的注册服务,ApplicationController#addInstance,封装了一个Lease类,保证当前注册实例有效。而清除过期实例是用的EvictionTask。发送定时心跳,主要就是维护Lease对象,续期Lease。
    @Overridepublic void register(InstanceInfo registrant) {Map<String, Lease<InstanceInfo>> registerMap = registry.get(registrant.getAppName());if (registerMap == null) {ConcurrentHashMap<String, Lease<InstanceInfo>> registerNewMap = new ConcurrentHashMap<>();registerMap = registry.putIfAbsent(registrant.getAppName(), registerNewMap);if (registerMap == null) {registerMap = registerNewMap;}}Lease<InstanceInfo> existingLease = registerMap.get(registrant.getInstanceId());if (existingLease != null && (existingLease.getHolder() != null)) {Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {registrant = existingLease.getHolder();}}Lease<InstanceInfo> lease = new Lease<>(registrant);if (existingLease != null) {lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());}registerMap.put(registrant.getInstanceId(), lease);if (InstanceStatus.UP.equals(registrant.getStatus())) {lease.serviceUp();}registrant.setActionType(InstanceInfo.ActionType.ADDED);registrant.setLastUpdatedTimestamp();}
  1. 客户端发送监听配置的请求,服务端会持有长轮询的请求,cLongPollingService#addLongPollingClientClientLongPolling是一个Runnable,从请求里面获取clientIdentify,再判断和当前的配置是否一致,不一致则返回最新的数据。
    public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,int probeRequestSize) {String str = req.getHeader(LONG_POLLING_HEADER);String noHangUpFlag = req.getHeader(LONG_POLLING_NO_HANG_UP_HEADER);int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);long timeout = Math.max(10000, Long.parseLong(str) - delayTime);if (isFixedPolling()) {timeout = Math.max(10000, getFixedPollingInterval());} else {List<String> changedGroups = Md5ConfigUtil.compareMd5(req, clientMd5Map);if (!changedGroups.isEmpty()) {generateResponse(rsp, changedGroups);return;} else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {log.info("New initializing cacheData added in.");return;}}String clientIdentify = RequestUtil.getClientIdentify(req);final AsyncContext asyncContext = req.startAsync();asyncContext.setTimeout(0L);ConfigExecutor.executeLongPolling(new ClientLongPolling(asyncContext, clientMd5Map, clientIdentify, probeRequestSize,timeout - delayTime, Pair.of(req.getHeader(CLIENT_APP_NAME_HEADER), req.getHeader(CLIENT_VERSION))));}

总结

  1. 服务端类似于注册中心(eureka),会将客户端的信息展示出来;同样,服务端也是一个配置中心(nacos),当服务端的配置有修改,客户端会监听到服务端的配置变更,对线程池的改变实时生效。

【开源项目】动态线程池框架Hippo4j源码解析相关推荐

  1. 【Android 异步操作】线程池 ( 线程池 execute 方法源码解析 )

    文章目录 一.线程池 execute 方法源码解析 二.线程池 execute 方法完整源码及注释 一.线程池 execute 方法源码解析 进入 ThreadPoolExecutor 中 , 查看线 ...

  2. c++ socket线程池原理_ThreadPoolExecutor线程池实现原理+源码解析

    推荐学习 被微服务轰炸?莫怕!耗时35天整出的「微服务学习教程」送你 死磕「并发编程」100天,全靠阿里大牛的这份最全「高并发套餐」 闭关28天,奉上[Java一线大厂高岗面试题解析合集],备战金九银 ...

  3. Java线程池execute()方法源码解析

    先看作者给出的注释来理解线程池到底有什么作用 * Thread pools address two different problems: they usually * provide improve ...

  4. 美团:某动态线程池框架是官方开源的么?

    大家好,我是马称. 最近,有很多同学在微信上问我这么一个问题: Hippo4j 动态线程池框架是美团开源的么? 类似于这样的问题还挺多,在这里统一回复下: 美团官方并没有开源任何关于动态线程池的框架. ...

  5. vue开源项目(各大插件,gitup源码)

    vue开源项目(各大插件,gitup源码) 目录 UI组件 开发框架 实用库 服务端 辅助工具 应用实例 Demo示例 UI组件 element★31142 - 饿了么出品的Vue2的web UI工具 ...

  6. Java线程池状态判断源码_深入浅出Java线程池:源码篇

    前言 在上一篇文章深入浅出Java线程池:理论篇中,已经介绍了什么是线程池以及基本的使用.(本来写作的思路是使用篇,但经网友建议后,感觉改为理论篇会更加合适).本文则深入线程池的源码,主要是介绍Thr ...

  7. 【Java 并发编程】线程池机制 ( 线程池执行任务细节分析 | 线程池执行 execute 源码分析 | 先创建核心线程 | 再放入阻塞队列 | 最后创建非核心线程 )

    文章目录 一.线程池执行任务细节分析 二.线程池执行 execute 源码分析 一.线程池执行任务细节分析 线程池执行细节分析 : 核心线程数 101010 , 最大小成熟 202020 , 非核心线 ...

  8. 并发编程之 Executor 线程池原理与源码解读

    并发编程之 Executor 线程池原理与源码解读 线程是调度 CPU 资源的最小单位,线程模型分为 KLT 模型与 ULT 模型,JVM使用的是 KLT 模型.java线程与 OS 线程保持 1:1 ...

  9. 深度学习框架Caffe源码解析

    作者:薛云峰(https://github.com/HolidayXue),主要从事视频图像算法的研究, 本文来源微信公众号:深度学习大讲堂.  原文:深度学习框架Caffe源码解析  欢迎技术投稿. ...

最新文章

  1. Please make sure you have the correct access rights and the repository exists.问题解决
  2. 监控软件nagios之安装
  3. 数据结构——交换左右子树
  4. html浮动炫酷样式,jQuery和CSS3炫酷表单浮动标签特效
  5. 【MyBatis框架】高级映射-多对多查询
  6. Vhost and VIOMMU
  7. Spring的XML解析原理,java软件开发面试常见问题
  8. 加密解密你了解多少?
  9. java 服务端解决ajax跨域问题
  10. visual studio使用小技巧(以vs2012为例)
  11. 载体构建实例解析——构建 SETD3-pEGFP-N1(Snapgene 设计引物)
  12. 计算机搜不到连接打印机主机,电脑搜索不到局域网打印机怎么办
  13. 01-探寻 JavaScript 反爬虫的根本原因
  14. C++ 罗马数字转整数
  15. sbrkr.c:(.text._sbrk_r+0xc): undefined reference to `_sbrk'
  16. Navicat 创建数据库
  17. win10卸载python3
  18. SQL Server 2014如何修改数据库名
  19. 西南交大计算机专硕薪资,考研高校就业和薪酬数据统计:西南交通大学
  20. 类似 marquee 的图片左右滑动且无缝对接

热门文章

  1. 构筑立体世界,AR Engine助力B站会员购打造沉浸式营销
  2. js输入两个数,求最小公倍数和最大公约数
  3. 【算法-初级-数组】两数之和(JavaScript实现)
  4. File类与IO流知识总结(三):缓冲流、转换流、序列化流
  5. halcon 相机标定
  6. 小米副总裁崔宝秋离职 原掌管的清河大学并入人力资源部
  7. pythonset使用教程_set常见方法_清华尹成python入门教程_少儿编程视频-51CTO学院
  8. 02. 数据库的选择
  9. 记住下次看小电影前,一定要检查域名是不是 HTTPS 的!
  10. 教你在ubuntu系统下制作win10的启动u盘,建议收藏起来-win10 u盘