在本文中,我们将扩展具有监视功能的ExecutorService实现。 这种监视功能将帮助我们在实时生产环境中测量多个池参数,即活动线程,工作队列大小等。 它还将使我们能够衡量任务执行时间,成功任务计数和失败任务计数。

监控库

至于监控库,我们将使用Metrics 。 为了简单起见,我们将使用ConsoleReporter ,它将向控制台报告指标。 对于生产级应用程序,我们应该使用高级报告器(即Graphite报告器)。 如果您不熟悉指标,那么建议您阅读入门指南 。

让我们开始吧。

扩展ThreadPoolExecutor

我们将使用ThreadPoolExecutor作为新类型的基类。 我们将其称为MonitoredThreadPoolExecutor 。 此类将接受MetricRegistry作为其构造函数参数之一–

public class MonitoredThreadPoolExecutor extends ThreadPoolExecutor {private final MetricRegistry metricRegistry;public MonitoredThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,MetricRegistry metricRegistry) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);this.metricRegistry = metricRegistry;}public MonitoredThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,MetricRegistry metricRegistry) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);this.metricRegistry = metricRegistry;}public MonitoredThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler,MetricRegistry metricRegistry) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);this.metricRegistry = metricRegistry;}public MonitoredThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler,MetricRegistry metricRegistry) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);this.metricRegistry = metricRegistry;}
}

注册仪表以测量特定于池的参数

量表是一个值的瞬时量度。 我们将使用它来测量不同的池参数,例如活动线程数,任务队列大小等。

在注册仪表之前,我们需要确定如何为线程池计算指标名称。 每个度量标准,无论是仪表,计时器还是仪表,都有一个唯一的名称。 此名称用于标识度量标准来源。 此处的约定是使用点分字符串,该点分字符串通常由要监视的类的完全限定名称构成。

对于我们的线程池,我们将使用其完全限定名称作为指标名称的前缀。 另外,我们将添加另一个名为
poolName,客户端将使用它来指定特定于实例的标识符。

实施这些更改后,该类如下所示–

public class MonitoredThreadPoolExecutor extends ThreadPoolExecutor {private final MetricRegistry metricRegistry;private final String metricsPrefix;public MonitoredThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,MetricRegistry metricRegistry,String poolName) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);this.metricRegistry = metricRegistry;this.metricsPrefix = MetricRegistry.name(getClass(), poolName);}// Rest of the constructors
}

现在我们准备注册我们的仪表。 为此,我们将定义一个私有方法–

private void registerGauges() {metricRegistry.register(MetricRegistry.name(metricsPrefix, "corePoolSize"), (Gauge<Integer>) this::getCorePoolSize);metricRegistry.register(MetricRegistry.name(metricsPrefix, "activeThreads"), (Gauge<Integer>) this::getActiveCount);metricRegistry.register(MetricRegistry.name(metricsPrefix, "maxPoolSize"), (Gauge<Integer>) this::getMaximumPoolSize);metricRegistry.register(MetricRegistry.name(metricsPrefix, "queueSize"), (Gauge<Integer>) () -> getQueue().size());
}

对于我们的示例,我们正在测量核心池大小,活动线程数,最大池大小和任务队列大小。 根据监视要求,我们可以注册更多/更少的量规来测量不同的属性。

现在,所有构造函数都将调用此私有方法–

public MonitoredThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,MetricRegistry metricRegistry,String poolName
) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);this.metricRegistry = metricRegistry;this.metricsPrefix = MetricRegistry.name(getClass(), poolName);registerGauges();
}

测量任务执行时间

为了衡量任务执行时间,我们将覆盖ThreadPoolExecutor提供的两个生命周期方法– beforeExecuteafterExecute

顾名思义,执行任务之前,将由执行任务的线程调用beforeExecute回调。 此回调的默认实现不执行任何操作。

同样,在执行每个任务之后,执行任务的线程将调用afterExecute回调。 此回调的默认实现也不执行任何操作。 即使任务抛出未捕获的RuntimeExceptionError ,也会调用此回调。

我们将在beforeExecute覆盖中启动一个Timer ,然后将其用于afterExecute覆盖中以获取总的任务执行时间。 为了存储对Timer的引用,我们将在类中引入一个新的ThreadLocal字段。

回调的实现如下:

public class MonitoredThreadPoolExecutor extends ThreadPoolExecutor {private final MetricRegistry metricRegistry;private final String metricsPrefix;private ThreadLocal<Timer.Context> taskExecutionTimer = new ThreadLocal<>();// Constructors@Overrideprotected void beforeExecute(Thread thread, Runnable task) {super.beforeExecute(thread, task);Timer timer = metricRegistry.timer(MetricRegistry.name(metricsPrefix, "task-execution"));taskExecutionTimer.set(timer.time());}@Overrideprotected void afterExecute(Runnable task, Throwable throwable) {Timer.Context context = taskExecutionTimer.get();context.stop();super.afterExecute(task, throwable);}
}

记录由于未捕获的异常而导致的失败任务数

afterExecute回调的第二个参数是Throwable 。 如果非null,则此Throwable引用导致执行终止的未捕获RuntimeExceptionError 。 我们可以使用此信息来部分计算由于未捕获的异常而突然终止的任务总数。

要获得失败任务的总数,我们必须考虑另一种情况。 使用execute方法提交的任务将抛出任何未捕获的异常,并且它将用作afterExecute回调的第二个参数。 但是,执行者服务会吞下使用Submit方法提交的任务。 JavaDoc (重点是我的话)中对此做了清楚的解释-


注意:如果将动作显式地或通过诸如Submit之类的方法包含在任务(例如FutureTask)中,则这些任务对象会捕获并维护计算异常,因此它们不会导致突然终止,并且内部异常不会传递给此方法。 如果您想使用此方法捕获两种类型的失败,则可以进一步探查此类情况,例如,在此示例子类中,如果任务被中止,则打印直接原因或潜在异常。 幸运的是,同一文档还为此提供了一种解决方案,即检查可运行对象以查看其是否为Future ,然后获取基础异常。

结合这些方法,我们可以如下修改afterExecute方法–

@Override
protected void afterExecute(Runnable runnable, Throwable throwable) {Timer.Context context = taskExecutionTimer.get();context.stop();super.afterExecute(runnable, throwable);if (throwable == null && runnable instanceof Future && ((Future) runnable).isDone()) {try {((Future) runnable).get();} catch (CancellationException ce) {throwable = ce;} catch (ExecutionException ee) {throwable = ee.getCause();} catch (InterruptedException ie) {Thread.currentThread().interrupt();}}if (throwable != null) {Counter failedTasksCounter = metricRegistry.counter(MetricRegistry.name(metricsPrefix, "failed-tasks"));failedTasksCounter.inc();}
}

计算成功任务的总数

先前的方法也可以用于计算成功任务的总数:完成的任务不会抛出任何异常或错误–

@Override
protected void afterExecute(Runnable runnable, Throwable throwable) {// Rest of the method body .....if (throwable != null) {Counter failedTasksCounter = metricRegistry.counter(MetricRegistry.name(metricsPrefix, "failed-tasks"));failedTasksCounter.inc();} else {Counter successfulTasksCounter = metricRegistry.counter(MetricRegistry.name(metricsPrefix, "successful-tasks"));successfulTasksCounter.inc();}
}

结论

在本文中,我们研究了对ExecutorService实现的一些监视友好的自定义。 像往常一样,任何建议/改进/错误修复将不胜感激。 至于示例源代码,它已上传到
Github 。

翻译自: https://www.javacodegeeks.com/2018/05/java-tips-creating-a-monitoring-friendly-executorservice.html

Java技巧:创建监视友好的ExecutorService相关推荐

  1. ExecutorService为创建的线程池ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE)

    ExecutorService就是要创建的线程池  JAVA中线程池用类ExecutorService代表 ,案例ExecutorService pool = Executors.newFixedTh ...

  2. akka actor java_Akka:使用非默认构造函数在Scala中定义一个actor并从Java代码创建它 - java...

    Akka Scala演员必须扩展akka.actor.Actor Akka Java actor必须扩展akka.actor.UntypedActor 因此,在使用非默认构造函数定义Scala act ...

  3. java闪屏怎么制作,Java Swing创建自定义闪屏:在闪屏下画进度条(一)

    Java Swing创建自定义闪屏:在闪屏上画进度条(一) 由于本人十分热爱Java Swing,所以平时闲暇之余总是喜欢极尽所能去搜藏一些自认为比较"酷"的Swing代码来研究揣 ...

  4. 翻译:通过Java编程创建X.509格式的数字签名证书

    本文翻译自此篇文章,如有余力可直接阅读原文. 我所需要解决的问题很简单:创建一个只需要配置很少字段的X.509协议的证书,再使用已有的CA私钥/证书进行签名,最后导出为PKCS12格式的签名证书.把这 ...

  5. java程序移动图形源程序_如何运用java程序设计创建移动图形

    那么这一节我们就来学习一下,如何运用java程序设计创建移动图形,希望大家能够好好学习这个课程,并且能够在这个基础上进行修改代码. 在上一节的教程中,我们学会了基本绘图方法,基本思路是先创建一个窗体框 ...

  6. JAVA中创建线程池的五种方法及比较

    之前写过JAVA中创建线程的三种方法及比较.这次来说说线程池. JAVA中创建线程池主要有两类方法,一类是通过Executors工厂类提供的方法,该类提供了4种不同的线程池可供使用.另一类是通过Thr ...

  7. Java中创建泛型数组

    Java中创建泛型数组 使用泛型时,我想很多人肯定尝试过如下的代码,去创建一个泛型数组 T[] array = new T[]; 当我们写出这样的代码时编译器会报Cannot create a gen ...

  8. java inner class,C# Inner Class vs. java 的inner class比较-JSP教程,Java技巧及代码

    作者: leafwiz www.aspcool.com 时间:2004-11-6 15:50:57 阅读次数:1811 今天朋友问到,为什么在c#中inner class不能够访问外部类的非静态成员, ...

  9. 【JVM】Java对象创建的流程步骤

    · 本文摘要 · 罗列Java创建对象的各种方式: · 讲解Java对象创建的流程步骤: 一.Java创建对象的各种方式 · 1. 用关键字new,老少皆知的方法:StringBuffer sb = ...

最新文章

  1. VTK:IO之GLTFExporter
  2. boost之lexical_cast
  3. Linux网络编程一步一步学+基础
  4. 洛谷 - P1361 小M的作物(最大流最小割)
  5. centos7安装flink集群_《从0到1学习Flink》—— Flink 写入数据到 Kafka
  6. Linux创建名为vgtest的分区,第五周作业,
  7. python写文件追加 按行追加_你真的会用python进行文件操作吗
  8. 云盘上传一直显示服务器出错_百度云盘一直服务器忙 百度网盘出现服务器错误...
  9. 风尘若幻_封装win7_sp3(终于可以和大家见面了,欢迎试用-谢谢支持!!!)
  10. java文件内容比较_怎么用JAVA技术编写一个两文件内容比较的程序?
  11. 七年切图仔如何面试大厂web前端?(沟通软技能总结)
  12. 几种镜像恒流源电路分析!
  13. 移动通信技术的发展历程初
  14. matlab 求解高次方程,Matlab求解多元高次方程组
  15. linux figlet 制作 banner
  16. ZOJ2477 拼魔方
  17. Java实战项目二(超详细)---奔跑吧小恐龙
  18. Misc 第七篇——base64stego(伪加密,base64隐写)
  19. 卷积自编码器CAEs
  20. FIR滤波器设计(Kaiser窗案例)

热门文章

  1. Sentinel(八)之熔断降级
  2. Maven的pom.xml文件详解------Environment Settings
  3. Linux--用SecureCRT来上传和下载文件
  4. JavaFX UI控件教程(六)之Toggle Button
  5. SSL / TLS 协议运行机制详解
  6. Java多线程:实现方式Thread与Runnable
  7. Java中关于String类型的10个问题
  8. java的for循环
  9. Java连接PostgreSQL数据库,增删改查
  10. win10控制视频声音大小