聊聊ExecutorService的监控
为什么80%的码农都做不了架构师?>>>
序
本文主要研究一下ExecutorService的监控
InstrumentedExecutorService
metrics-core-4.0.2-sources.jar!/com/codahale/metrics/InstrumentedExecutorService.java
/*** An {@link ExecutorService} that monitors the number of tasks submitted, running,* completed and also keeps a {@link Timer} for the task duration.* <p/>* It will register the metrics using the given (or auto-generated) name as classifier, e.g:* "your-executor-service.submitted", "your-executor-service.running", etc.*/
public class InstrumentedExecutorService implements ExecutorService {private static final AtomicLong NAME_COUNTER = new AtomicLong();private final ExecutorService delegate;private final Meter submitted;private final Counter running;private final Meter completed;private final Timer idle;private final Timer duration;/*** Wraps an {@link ExecutorService} uses an auto-generated default name.** @param delegate {@link ExecutorService} to wrap.* @param registry {@link MetricRegistry} that will contain the metrics.*/public InstrumentedExecutorService(ExecutorService delegate, MetricRegistry registry) {this(delegate, registry, "instrumented-delegate-" + NAME_COUNTER.incrementAndGet());}/*** Wraps an {@link ExecutorService} with an explicit name.** @param delegate {@link ExecutorService} to wrap.* @param registry {@link MetricRegistry} that will contain the metrics.* @param name name for this executor service.*/public InstrumentedExecutorService(ExecutorService delegate, MetricRegistry registry, String name) {this.delegate = delegate;this.submitted = registry.meter(MetricRegistry.name(name, "submitted"));this.running = registry.counter(MetricRegistry.name(name, "running"));this.completed = registry.meter(MetricRegistry.name(name, "completed"));this.idle = registry.timer(MetricRegistry.name(name, "idle"));this.duration = registry.timer(MetricRegistry.name(name, "duration"));}@Overridepublic void execute(Runnable runnable) {submitted.mark();delegate.execute(new InstrumentedRunnable(runnable));}@Overridepublic Future<?> submit(Runnable runnable) {submitted.mark();return delegate.submit(new InstrumentedRunnable(runnable));}@Overridepublic <T> Future<T> submit(Runnable runnable, T result) {submitted.mark();return delegate.submit(new InstrumentedRunnable(runnable), result);}@Overridepublic <T> Future<T> submit(Callable<T> task) {submitted.mark();return delegate.submit(new InstrumentedCallable<>(task));}@Overridepublic <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {submitted.mark(tasks.size());Collection<? extends Callable<T>> instrumented = instrument(tasks);return delegate.invokeAll(instrumented);}@Overridepublic <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {submitted.mark(tasks.size());Collection<? extends Callable<T>> instrumented = instrument(tasks);return delegate.invokeAll(instrumented, timeout, unit);}@Overridepublic <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws ExecutionException, InterruptedException {submitted.mark(tasks.size());Collection<? extends Callable<T>> instrumented = instrument(tasks);return delegate.invokeAny(instrumented);}@Overridepublic <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws ExecutionException, InterruptedException, TimeoutException {submitted.mark(tasks.size());Collection<? extends Callable<T>> instrumented = instrument(tasks);return delegate.invokeAny(instrumented, timeout, unit);}private <T> Collection<? extends Callable<T>> instrument(Collection<? extends Callable<T>> tasks) {final List<InstrumentedCallable<T>> instrumented = new ArrayList<>(tasks.size());for (Callable<T> task : tasks) {instrumented.add(new InstrumentedCallable<>(task));}return instrumented;}@Overridepublic void shutdown() {delegate.shutdown();}@Overridepublic List<Runnable> shutdownNow() {return delegate.shutdownNow();}@Overridepublic boolean isShutdown() {return delegate.isShutdown();}@Overridepublic boolean isTerminated() {return delegate.isTerminated();}@Overridepublic boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException {return delegate.awaitTermination(l, timeUnit);}//......
}
- InstrumentedExecutorService实现了ExecutorService,对jdk原始的ExecutorService进行了包装,对相应的方法织入指标统计
- 主要统计了已提交的任务submitted(
Meter
),运行中的任务running(Counter
),完成的任务completed(Meter
),空闲时长idle(Timer
),运行时长duration(Timer
) - 为了统计后面几个指标,需要对Runnable以及Callable进行织入,因而引入了InstrumentedRunnable、InstrumentedCallable
InstrumentedRunnable
private class InstrumentedRunnable implements Runnable {private final Runnable task;private final Timer.Context idleContext;InstrumentedRunnable(Runnable task) {this.task = task;this.idleContext = idle.time();}@Overridepublic void run() {idleContext.stop();running.inc();final Timer.Context durationContext = duration.time();try {task.run();} finally {durationContext.stop();running.dec();completed.mark();}}}
- 织入了对idle、duration、running、completed的统计
InstrumentedCallable
private class InstrumentedCallable<T> implements Callable<T> {private final Callable<T> callable;InstrumentedCallable(Callable<T> callable) {this.callable = callable;}@Overridepublic T call() throws Exception {running.inc();final Timer.Context context = duration.time();try {return callable.call();} finally {context.stop();running.dec();completed.mark();}}}
- 织入了对duration、running、completed的统计
ExecutorServiceMetrics
micrometer-core-1.0.3-sources.jar!/io/micrometer/core/instrument/binder/jvm/ExecutorServiceMetrics.java
/*** Monitors the status of executor service pools. Does not record timings on operations executed in the {@link ExecutorService},* as this requires the instance to be wrapped. Timings are provided separately by wrapping the executor service* with {@link TimedExecutorService}.** @author Jon Schneider* @author Clint Checketts*/
@NonNullApi
@NonNullFields
public class ExecutorServiceMetrics implements MeterBinder {@Nullableprivate final ExecutorService executorService;private final Iterable<Tag> tags;public ExecutorServiceMetrics(@Nullable ExecutorService executorService, String executorServiceName, Iterable<Tag> tags) {this.executorService = executorService;this.tags = Tags.concat(tags, "name", executorServiceName);}//....../*** Record metrics on the use of an {@link Executor}.** @param registry The registry to bind metrics to.* @param executor The executor to instrument.* @param executorName Will be used to tag metrics with "name".* @param tags Tags to apply to all recorded metrics.* @return The instrumented executor, proxied.*/public static Executor monitor(MeterRegistry registry, Executor executor, String executorName, Iterable<Tag> tags) {if (executor instanceof ExecutorService) {return monitor(registry, (ExecutorService) executor, executorName, tags);}return new TimedExecutor(registry, executor, executorName, tags);}/*** Record metrics on the use of an {@link ExecutorService}.** @param registry The registry to bind metrics to.* @param executor The executor to instrument.* @param executorServiceName Will be used to tag metrics with "name".* @param tags Tags to apply to all recorded metrics.* @return The instrumented executor, proxied.*/public static ExecutorService monitor(MeterRegistry registry, ExecutorService executor, String executorServiceName, Iterable<Tag> tags) {new ExecutorServiceMetrics(executor, executorServiceName, tags).bindTo(registry);return new TimedExecutorService(registry, executor, executorServiceName, tags);}@Overridepublic void bindTo(MeterRegistry registry) {if (executorService == null) {return;}String className = executorService.getClass().getName();if (executorService instanceof ThreadPoolExecutor) {monitor(registry, (ThreadPoolExecutor) executorService);} else if (className.equals("java.util.concurrent.Executors$DelegatedScheduledExecutorService")) {monitor(registry, unwrapThreadPoolExecutor(executorService, executorService.getClass()));} else if (className.equals("java.util.concurrent.Executors$FinalizableDelegatedExecutorService")) {monitor(registry, unwrapThreadPoolExecutor(executorService, executorService.getClass().getSuperclass()));} else if (executorService instanceof ForkJoinPool) {monitor(registry, (ForkJoinPool) executorService);}}private void monitor(MeterRegistry registry, @Nullable ThreadPoolExecutor tp) {if (tp == null) {return;}FunctionCounter.builder("executor.completed", tp, ThreadPoolExecutor::getCompletedTaskCount).tags(tags).description("The approximate total number of tasks that have completed execution").baseUnit("tasks").register(registry);Gauge.builder("executor.active", tp, ThreadPoolExecutor::getActiveCount).tags(tags).description("The approximate number of threads that are actively executing tasks").baseUnit("threads").register(registry);Gauge.builder("executor.queued", tp, tpRef -> tpRef.getQueue().size()).tags(tags).description("The approximate number of threads that are queued for execution").baseUnit("threads").register(registry);Gauge.builder("executor.pool.size", tp, ThreadPoolExecutor::getPoolSize).tags(tags).description("The current number of threads in the pool").baseUnit("threads").register(registry);}//......
}
- ExecutorServiceMetrics实现了MeterBinder接口,另外提供了静态方法来创建带有监控指标的ExecutorService,该静态方法命名为monitor,非常形象
- monitor方法首先创建ExecutorServiceMetrics,并bindTo了MeterRegistry,然后返回TimedExecutorService
- bindTo方法上报了executor.completed(
FunctionCounter
),executor.active(Gauge
),executor.queued(Gauge
),executor.pool.size(Gauge
)这几个指标
TimedExecutorService
micrometer-core-1.0.3-sources.jar!/io/micrometer/core/instrument/internal/TimedExecutorService.java
/*** An {@link java.util.concurrent.ExecutorService} that is timed** @author Jon Schneider*/
public class TimedExecutorService implements ExecutorService {private final ExecutorService delegate;private final Timer timer;public TimedExecutorService(MeterRegistry registry, ExecutorService delegate, String executorServiceName, Iterable<Tag> tags) {this.delegate = delegate;this.timer = registry.timer("executor", Tags.concat(tags ,"name", executorServiceName));}@Overridepublic void shutdown() {delegate.shutdown();}@Overridepublic List<Runnable> shutdownNow() {return delegate.shutdownNow();}@Overridepublic boolean isShutdown() {return delegate.isShutdown();}@Overridepublic boolean isTerminated() {return delegate.isTerminated();}@Overridepublic boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {return delegate.awaitTermination(timeout, unit);}@Overridepublic <T> Future<T> submit(Callable<T> task) {return delegate.submit(timer.wrap(task));}@Overridepublic <T> Future<T> submit(Runnable task, T result) {return delegate.submit(() -> timer.record(task), result);}@Overridepublic Future<?> submit(Runnable task) {return delegate.submit(() -> timer.record(task));}@Overridepublic <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {return delegate.invokeAll(wrapAll(tasks));}@Overridepublic <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {return delegate.invokeAll(wrapAll(tasks), timeout, unit);}@Overridepublic <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {return delegate.invokeAny(wrapAll(tasks));}@Overridepublic <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {return delegate.invokeAny(wrapAll(tasks), timeout, unit);}@Overridepublic void execute(Runnable command) {delegate.execute(timer.wrap(command));}private <T> Collection<? extends Callable<T>> wrapAll(Collection<? extends Callable<T>> tasks) {return tasks.stream().map(timer::wrap).collect(toList());}
}
- 对ExecutorService进行包装,增加了
Timer.record
micrometer-core-1.0.3-sources.jar!/io/micrometer/core/instrument/Timer.java
/*** Executes the runnable `f` and records the time taken.** @param f Function to execute and measure the execution time.*/void record(Runnable f);/*** Wrap a {@link Runnable} so that it is timed when invoked.** @param f The Runnable to time when it is invoked.* @return The wrapped Runnable.*/default Runnable wrap(Runnable f) {return () -> record(f);}/*** Wrap a {@link Callable} so that it is timed when invoked.** @param f The Callable to time when it is invoked.* @param <T> The return type of the callable.* @return The wrapped callable.*/default <T> Callable<T> wrap(Callable<T> f) {return () -> recordCallable(f);}
- warp方法主要是包装调用record方法,而record由实现类去实现
AbstractTimer
micrometer-core-1.0.3-sources.jar!/io/micrometer/core/instrument/AbstractTimer.java
@Overridepublic void record(Runnable f) {final long s = clock.monotonicTime();try {f.run();} finally {final long e = clock.monotonicTime();record(e - s, TimeUnit.NANOSECONDS);}}@Overridepublic final void record(long amount, TimeUnit unit) {if (amount >= 0) {histogram.recordLong(TimeUnit.NANOSECONDS.convert(amount, unit));recordNonNegative(amount, unit);if (intervalEstimator != null) {intervalEstimator.recordInterval(clock.monotonicTime());}}}
- record采用histogram进行统计
小结
dropwizard及micrometer均提供了对ExecutorService的指标统计的包装,micrometer则更近一步提供了静态方法来直接创建,非常方便。
doc
- InstrumentedExecutorService
- ExecutorServiceMetrics
转载于:https://my.oschina.net/go4it/blog/2223506
聊聊ExecutorService的监控相关推荐
- 聊聊AIOps落地监控报警的应对之策
作者简介 周伟 百度高级研发工程师 负责百度智能运维(Noah)监控报警系统.通告平台:在精准报警.精准通告.报警收敛.公/私有云监控等方向具有广泛的实践经验. 干货概览 监控报警是故障发现的重 ...
- 普罗米修斯监控系统_基于Prometheus和Grafana的监控平台 - 环境搭建
导读 微服务中的监控分根据作用领域分为三大类,Logging,Tracing,Metrics. Logging - 用于记录离散的事件.例如,应用程序的调试信息或错误信息.它是我们诊断问题的依据.比如 ...
- html的css怎么设置深度,vue css 深度选择器
Conquer and Divide经典例子之Strassen算法解决大型矩阵的相乘 在通过汉诺塔问题理解递归的精髓中我讲解了怎么把一个复杂的问题一步步recursively划分了成简单显而易见的小问 ...
- 知了 | 基于NLP的智能问答推荐系统
作者简介 苗贝贝 百度高级研发工程师 负责百度智能运维客服平台ChatOps,在时序数据异常检测.文本模式识别.相似度网络等方向也有广泛的实践经验. 干货概览 通常,客服系统主要有两种应答模式: ...
- 超级玛丽马里奥版下载_将超级马里奥赋予生命
超级玛丽马里奥版下载 Have you ever seen a zoetrope? If today's sophisticated computer animation is the latest ...
- 装在笔记本里的私有云环境:准备篇
接下来我计划写一个小系列,聊聊如何把一个简化过的私有云环境部署在笔记本里,以满足低成本.低功耗.低延时的实验环境. 过程中,将尽可能使用主流的开源软件和技术栈来完成功能,尽量保持日常使用的云服务的基础 ...
- 【原】聊聊js代码异常监控
[原]聊聊js代码异常监控 参考文章: (1)[原]聊聊js代码异常监控 (2)https://www.cnblogs.com/xianyulaodi/p/6201829.html (3)https: ...
- 聊聊Spring Boot服务监控,健康检查,线程信息,JVM堆信息,指标收集,运行情况监控等!...
来自:https://juejin.im/post/5e2179def265da3e152d2561 前言 去年我们项目做了微服务1.0的架构转型,但是服务监控这块却没有跟上.这不,最近我就被分配了要 ...
- 实现pv uv统计_聊聊前端监控(二)--行为监控的技术实现
上一篇梳理了前端监控的主要场景和类型,从本文开始,讨论下我知道的一些技术实现.前端黑科技层出不穷,个人眼界有限,尽量把了解到的实现方式都罗列出来,希望对大家有些启发,同时也欢迎流言讨论. 限于篇幅,按 ...
最新文章
- 图灵奖得主高德纳与 LaTex 有啥关系?90%的人都不知道
- 158行Python代码复现:DeepMind提图像生成的递归神经网络DRAW
- c++ 全局变量初始化的一点总结
- 疯狂java讲义之流程控制与数组
- 域名解析的记录类型区别
- 【深入Java虚拟机JVM 07】JVM如何判断对象已死
- 大锅菜机器人_炒菜机器人——烹饪界的一场革命
- 第十二届交博会正式启动 百度智慧交管解决方案助城市开启智能交通新纪元
- 【转载】Elasticsearch客户端API使用Demo
- 如何在Linux中找到您的IP地址
- DROP TABLE ** CASCADE CONSTRAINTS PURGE删除表的时候级联删除从表外键
- 基于深度学习模型的麻蕉疾病自动识别(增加形态计量和几何分析)
- 工行基于MySQL构建分布式架构的转型之路
- 50套3dmax家具建模详细教程 3dmax床建模教程丨3Dmax基础教程3dmax教学3dmax室内设计教程
- 计算机基础知识(上)(硬件篇)
- 角点(corner point)、关键点(key point)、特征点(feature point)概念辨析
- 当BTC大空头遇上PlusToken,投资竟然成为一门玄学?
- Linux下MySQL 5.7在线镜像安装
- oracle插入表当前时间,ORACLE自动插入当前时间
- 数据结构与算法Python版之北大慕课笔记(五)
热门文章
- mongodb的返回(2)
- WebApp的前端所遇问题
- [041] 微信公众帐号开发教程第17篇-应用实例之智能翻译
- (转) PowerDesigner中Table视图同时显示Code和Name
- 【XDA汉化组编写】Android软件汉化/精简/去广告/优化教程 FAQ
- nagios远程系统监测服务
- 错误:docker-ce-cli conflicts with 2:docker-1.13.1-103.git7f2769b.el7.centos.x86_64 错误:docker-ce confli
- 实现光晕效果_马自达6车灯升级激光四透镜实现四近四远光
- python 读取word_教你怎么使用 Python 对 word文档 进行操作
- ebs oracle 落伍了吗_向不了解oracle EBS的人介绍Oracle EBS