es中通过一个优先级的线程池PrioritizedEsThreadPoolExecutor来根据线程的优先级来控制优先顺序。

类的继承关系非常的清楚,继承自EsThreadPoolExecutor。

PrioritizedEsThreadPoolExecutor中的线程任务优先级先后关系通过一个队列来实现,当两个线程的优先级一样的时候根据FIFO的原则来确定线程任务的前后顺序。

当任务通过execute()方法来执行的时候,普通的线程任务会被包装成TieBreakingPrioritizedRunnable,其继承自PrioritizedRunnable,实现了Runnable接口的同时也实现了Comparable接口来方便线程任务先后顺序的确定。

public void execute(Runnable command, final TimeValue timeout, final Runnable timeoutCallback) {command = wrapRunnable(command);doExecute(command);if (timeout.nanos() >= 0) {if (command instanceof TieBreakingPrioritizedRunnable) {((TieBreakingPrioritizedRunnable) command).scheduleTimeout(timer, timeoutCallback, timeout);} else {// We really shouldn't be here. The only way we can get here if somebody created PrioritizedFutureTask// and passed it to execute, which doesn't make much sensethrow new UnsupportedOperationException("Execute with timeout is not supported for future tasks");}}
}
@Override
protected Runnable wrapRunnable(Runnable command) {if (command instanceof PrioritizedRunnable) {if ((command instanceof TieBreakingPrioritizedRunnable)) {return command;}Priority priority = ((PrioritizedRunnable) command).priority();return new TieBreakingPrioritizedRunnable(super.wrapRunnable(command), priority, insertionOrder.incrementAndGet());} else if (command instanceof PrioritizedFutureTask) {return command;} else { // it might be a callable wrapper...if (command instanceof TieBreakingPrioritizedRunnable) {return command;}return new TieBreakingPrioritizedRunnable(super.wrapRunnable(command), Priority.NORMAL, insertionOrder.incrementAndGet());}
}

TieBreakingPrioritizedRunnable相比PrioritizedRunnable,主要是多了一个insertionOrder,insertionOrder是有一个AtomicLong,每次一个新的线程任务要被执行的时候都会加一,并将其作为任务进入线程池的依据,以便作为判断先后。

@Override
public int compareTo(PrioritizedRunnable pr) {int res = super.compareTo(pr);if (res != 0 || !(pr instanceof TieBreakingPrioritizedRunnable)) {return res;}return insertionOrder < ((TieBreakingPrioritizedRunnable) pr).insertionOrder ? -1 : 1;
}

TieBreakingPrioritizedRunnable的compareTo()方法先调用父类的compareTo()方法来比较优先级,如果相等,则通过比较insertionOrder的大小来确定先后。

实现优先级的核心在于线程池的等待队列,也就是PriorityBlockingQueue。

PrioritizedEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,ThreadFactory threadFactory, ThreadContext contextHolder, ScheduledExecutorService timer) {super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(), threadFactory, contextHolder);this.timer = timer;
}

PriorityBlockingQueue的核心在于siftUpComparable()方法,这个方法将在add()将任务加入队列的时候 被调用,用来确认任务在队列当中的任务位置。

private static <T> void siftUpComparable(int k, T x, Object[] array) {Comparable<? super T> key = (Comparable<? super T>) x;while (k > 0) {int parent = (k - 1) >>> 1;Object e = array[parent];if (key.compareTo((T) e) >= 0)break;array[k] = e;k = parent;}array[k] = key;
}

可以看到,在这个队列的存储更类似树,第一个元素存储在下标0的位置,1,2位置则与0进行比较,3,4位置则与1进行比较以此类推,更类似一个大根堆。

再取出来的时候则是通过dequeue()方法,虽然只是简单的取数组下标0位置的,但是也要相应的通过siftDownComparable()方法重新使得当前队列最先被调用的任务在下标0的位置,同时确保别的任务下标往前移。

private static <T> void siftDownComparable(int k, T x, Object[] array,int n) {if (n > 0) {Comparable<? super T> key = (Comparable<? super T>)x;int half = n >>> 1;           // loop while a non-leafwhile (k < half) {int child = (k << 1) + 1; // assume left child is leastObject c = array[child];int right = child + 1;if (right < n &&((Comparable<? super T>) c).compareTo((T) array[right]) > 0)c = array[child = right];if (key.compareTo((T) c) <= 0)break;array[k] = c;k = child;}array[k] = key;}
}

这个方法,从最大的节点(已空)开始从左右儿子选择大者放入空节点,并尝试在下一个循环中,把空了的儿子节点用其儿子节点较大者填上。

这个队列通过大根堆的方式高效完成线程池等待队列当中线程任务触发先后的判断。

这也是PrioritizedEsThreadPoolExecutor优先级线程池的实现。

elasticsearch中的优先级线程池相关推荐

  1. Elasticsearch源码分析—线程池(十一) ——就是从队列里处理请求

    Elasticsearch源码分析-线程池(十一) 转自:https://www.felayman.com/articles/2017/11/10/1510291570687.html 线程池 每个节 ...

  2. 工作中如何使用线程池的?自己如何定义一个线程池?

    工作中如何使用线程池的?自己如何定义一个线程池? import java.util.concurrent.*;public class MyThreadPoolDemo {public static ...

  3. 线程池异步线程中再次获取线程池资源的问题

    问题描述 在线上发生的一次问题, 在场景中有这样一个业务, 需要异步执行一个主任务, 主任务中又包含着N个子任务, 为了整个主任务能够快速处理, 又将子任务按照数量获取线程资源异步处理, 即异步线程A ...

  4. 详解线程池的作用及Java中如何使用线程池

    服务端应用程序(如数据库和 Web 服务器)需要处理来自客户端的高并发.耗时较短的请求任务,所以频繁的创建处理这些请求的所需要的线程就是一个非常消耗资源的操作.常规的方法是针对一个新的请求创建一个新线 ...

  5. java中四种线程池及poolSize、corePoolSize、maximumPoolSize

    目录 ThreadPoolExecutor重要参数 poolSize.corePoolSize.maximumPoolSize 四种线程池 newFixedThreadPool newCachedTh ...

  6. java中四种线程池的区别

    本文按: 一. 线程池的使用 二. 几种线程池的区别 三. 如何合理配置线程池 一.线程池的使用 在Java中,通常使用Executors 获取线程池.常用的线程池有以下几种: (1)CachedTh ...

  7. java中定时任务和线程池_java基于线程池和反射机制实现定时任务完整实例

    本文实例讲述了java基于线程池和反射机制实现定时任务的方法.分享给大家供大家参考,具体如下: 主要包括如下实现类: 1. Main类: 任务执行的入口: 调用main方法,开始加载任务配置并执行任务 ...

  8. 开发中为什么使用线程池的原因

    降低资源的消耗:重复利用已经创建好的线程来降低线程的创建和销毁带来的损耗 提高响应的速度:线程池中有的线程处于空闲状态,当任务来的时候,就无需创建线程就可以立即去执行 线程池会根据当前系统的特点对池内 ...

  9. apollo源码分析 感知_Kitty中的动态线程池支持Nacos,Apollo多配置中心了

    目录 回顾昨日 nacos 集成 Spring Cloud Alibaba 方式 Nacos Spring Boot 方式 Apollo 集成 自研配置中心对接 无配置中心对接 实现源码分析 兼容 A ...

最新文章

  1. MPLS的简单配置4
  2. Windows Phone 7用户界面原型截图汇总
  3. 一行命令同时修改maven项目中多个mudule的版本号
  4. PPT 下载 | 神策数据孙文亮:客户全生命周期管理从方法到实践全解析
  5. Sqoop导入导出的时候总是出现等待5分钟的问题解决办法
  6. python怎么获取时间_Python:如何从datetime.timedelta对象中获取时间?
  7. spring学习笔记05-IOC常用注解(二)
  8. MySql 你知道什么情况下适合使用Join 联表查询吗 ?
  9. dev shm php,/dev/shm 介绍 --转载
  10. 网络安全世界の迷惑认知图鉴
  11. NX/UG二次开发—装配—实现标准件库添加组件效果
  12. C程序 --- 判断闰年平年以及二月天数
  13. PS长图快速切片_PS最常用的100多个快捷键
  14. oracle+suspend+参数,oracle数据库的挂起(Suspending)和恢复(Resuming)
  15. 解决MySQL 8.x以上版本安装中出现staring the server错误
  16. 《预训练周刊》第52期:屏蔽视觉预训练、目标导向对话
  17. 相关性扫描匹配CSM与分支限界
  18. 最新版Nessus的安装
  19. 计算机映像缺失磁盘如何修复,重装Win10系统找不到硬盘的解决方法
  20. idea全局搜索问题

热门文章

  1. transferto遇到的问题java.io.FileNotFoundException: C:\Users\Administrator\AppData\Local\Temp
  2. Elasticsearch单机安装
  3. 动态规划入门(一)——数字三角形
  4. html定位 浏览器兼容,IE6浏览器不支持固定定位(position:fixed)解决方案
  5. 浅谈,JavaScript 运行机制和Event Loop
  6. 俄罗斯调查PC厂商合谋操纵市场 联想被指妨碍调查
  7. js-jquery-插件开发(一)
  8. 如何方便快速在指定文件夹打开命令行
  9. 数据导入时遭遇 ORA-01187 ORA-01110
  10. vs2010 代码混淆 代码加密