Java的executorservice_ExecutorService-10个要诀和技巧
原文链接 作者:Tomasz Nurkiewicz 译者:simonwang
ExecutorService抽象概念自Java5就已经提出来了,现在是2014年。顺便提醒一下:Java5和Java6都已不被支持,Java7在半年内也将会这样。我提出这个的原因是许多Java程序员仍然不能完全明白ExecutorService到底是怎样工作的。还有很多地方要去学习,今天我会分享一些很少人知道的特性和实践。然而这篇文章仍然是面向中等程序员的,没什么特别高级的地方。
1. Name pool threads
我想强调一点的是,当在运行JVM或调试期间创建线程时,默认的线程池命名规则是pool-N-thread-M,这里N代表线程池的序列数(每一次你创建一个线程池的时候,全局计数N就加1),而M则是某一个线程池的线程序列数。例如,pool-2-thread-3就意味着JVM生命周期中第2线程池的第3线程。具体可以查看:Executors.defaultThreadFactory()。这样不具备描述性,JDK使得线程命名的过程有些微的复杂,因为命名的方法隐藏在ThreadFactory内部。幸运地是Guava有一个很有用的类:
import com.google.common.util.concurrent.ThreadFactoryBuilder;
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("Orders-%d")
.setDaemon(true)
.build();
final ExecutorService executorService = Executors.newFixedThreadPool(10, threadFactory);
线程池默认创造的是非守护线程,由你来决定是否合适。
2. Switch names according to context
有一个我从Supercharged jstack: How to Debug Your Servers at 100mph学到的小技巧。一旦我们记住了线程的名字,那么在任何时刻我们都能够改变它们!这是有道理的,因为线程转储显示了类名和方法名,没有参数和局部变量。通过调整线程名保留一些必要的事务标识符,我们可以很容易追踪某一条运行缓慢或者造成死锁的信息/记录/查询等。例如:
private void process(String messageId) {
executorService.submit(() -> {
final Thread currentThread = Thread.currentThread();
final String oldName = currentThread.getName();
currentThread.setName("Processing-" + messageId);
try {
//real logic here...
} finally {
currentThread.setName(oldName);
}
});
}
在try-finally块内部,当前线程被命名为Processing-WHATEVER-MESSAGE-ID-IS,当通过系统追踪信息流时这可能会派上用场。
3. Explicit and safe shutdown
在客户端线程和线程池之间有一个任务队列,当你的应用关闭时,你必须关心两件事:任务队列会发生什么;正在运行的任务会怎样(这个时候将详细介绍)。令人感到吃惊的是许多程序员并不会适当地或有意识地关闭线程池。这有两个方法:要么让所有的任务队列全都执行完(shutdown()),要么舍弃它们(shutdownNow()),这依赖你使用的具体情况。例如如果我们提交一连串的任务并且想要它们在完成后尽可能快的返回,可以使用shutdown():
private void sendAllEmails(List emails) throws InterruptedException {
emails.forEach(email ->
executorService.submit(() ->
sendEmail(email)));
executorService.shutdown();
final boolean done = executorService.awaitTermination(1, TimeUnit.MINUTES);
log.debug("All e-mails were sent so far? {}", done);
}
在这个例子中我们发送了一堆e-mail,每一个都作为一个独立的任务交给线程池。在提交了所有的任务之后我们执行shutdown使线程池不再接收新的任务。然后最多等待1minute直到所有的任务都完成。然而如果有些任务仍然处于挂起状态,awaitTermination()将返回false,而那些在等待的任务会继续执行。我知道一些人会使用新潮的用法:
emails.parallelStream().forEach(this::sendEmail);
你可能会觉得我太保守,但我喜欢去控制并行线程的数量。不用介意,还有一种优雅的shutdown()方法shutdownNow():
final List rejected = executorService.shutdownNow();
log.debug("Rejected tasks: {}", rejected.size());
这样一来队列中还在等待的任务将会被舍弃并被返回,但已经在运行的任务将会继续。
4. Handle interruption with care
很少人知道Future接口的cancel,这里我不想重复说明,你可以去看我以前的文章:
5. Monitor queue length and keep it bounded
不合适的线程池大小可能会造成运行缓慢、不稳定以及内存泄漏。如果你配置太少的线程,那么任务队列就会变大,消耗太多内存。另一方面太多的线程又会由于过度频繁的上下文切换而造成整个系统运行缓慢。所以观察队列的长度并将其限定在一定范围内是很重要的,这样的话过载的线程池会短暂拒绝新任务的提交:
final BlockingQueue queue = new ArrayBlockingQueue<>(100);
executorService = new ThreadPoolExecutor(n, n,
0L, TimeUnit.MILLISECONDS,
queue);
上面的代码和Executors.newFixedThreadPool(n)是等价的,然而不同的是默认情况下固定线程池使用的是无限制的LinkedBlockingQueue ,我们使用的是固定容量100的ArrayBlockingQueue。这就意味着如果已经有100个任务在排队(其中有n个任务正在执行),那么新的任务就将被驳回并抛出RejectedExecutionException。一旦在外部可以访问queue ,那么我们就可以周期性地调用size(),并把它提交到logs/JMX或其他任何你使用的监视器中。
6. Remember about exception handling
下面代码段的结果是什么?
executorService.submit(() -> {
System.out.println(1 / 0);
});
我深受其苦:它不会打印任何东西。不会抛出java.lang.ArithmeticException: / by zero,什么也没有。线程池将忽略这个异常,就像它从来没发生过。如果上面的代码是用java.lang.Thread偶然创造的,那么UncaughtExceptionHandler可能会起作用。但在线程池里你就要多加小心了。如果你正在提交Runnable (没有返回结果,就像上面),那么你必须将整个代码块用try-catch包起来,至少要log一下。如果你提交的是Callable,确保你总是使用阻塞的get()方法来重抛异常:
final Future division = executorService.submit(() -> 1 / 0);
//below will throw ExecutionException caused by ArithmeticException
division.get();
有趣的是就算是Spring框架在处理这个bug的时候会使用@Async,详细:SPR-8995和SPR-12090。
7. Monitor waiting time in a queue
监控工作队列深度又是一个层面,在排除单个事务或任务的故障时,有必要了解从任务的提交到实际执行耗时多长。这种等待时间最好趋近于零(当线程池中有空闲的线程时),但任务又不得不在队列中排队导致等待时间变长。而且如果池内没有一定数量的线程,在运行新任务时可能需要创造新的线程,而这个过程也是要消耗少量时间的。为了能够清楚地监测这个时间,我们使用类似下面的代码包装原始的ExecutorService :
public class WaitTimeMonitoringExecutorService implements ExecutorService {
private final ExecutorService target;
public WaitTimeMonitoringExecutorService(ExecutorService target) {
this.target = target;
}
@Override
public Future submit(Callable task) {
final long startTime = System.currentTimeMillis();
return target.submit(() -> {
final long queueDuration = System.currentTimeMillis() - startTime;
log.debug("Task {} spent {}ms in queue", task, queueDuration);
return task.call();
}
);
}
@Override
public Future submit(Runnable task, T result) {
return submit(() -> {
task.run();
return result;
});
}
@Override
public Future> submit(Runnable task) {
return submit(new Callable() {
@Override
public Void call() throws Exception {
task.run();
return null;
}
});
}
//...
}
这并不是完整的实现,但你得知道这个基本概念。当我们向线程池提交任务的那一刻,就立马开始测量时间,而任务一开始被执行就停止测量。不要被上面源码中很接近的startTime 和queueDuration 所迷惑了,事实上这两行是在不同的线程中执行的,可能有数毫秒甚至数秒的差别,例如:
Task com.nurkiewicz.MyTask@7c7f3894 spent 9883ms in queue
8. Preserve client stack trace
响应式编程这段日子似乎比较火,Reactive manifesto,reactive streams,RxJava(刚刚发布1.0),Clojure agents,scala.rx…,这些东西都挺好的,但它们的堆栈跟踪将不再友好,大多数堆栈跟踪没有什么卵用。举个例子,当线程池中的任务抛出了一个异常:
java.lang.NullPointerException: null
at com.nurkiewicz.MyTask.call(Main.java:76) ~[classes/:na]
at com.nurkiewicz.MyTask.call(Main.java:72) ~[classes/:na]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0]
at java.lang.Thread.run(Thread.java:744) ~[na:1.8.0]
我们很容易就发现MyTask在76行抛出了空指针异常,但我们并不知道是谁提交了这个任务,因为堆栈跟踪仅仅只是告诉你Thread和 ThreadPoolExecutor的信息。我们能通过源码从技术上定位MyTask被创造的位置,不需要线程(更不必说事件驱动、响应式编程)我们就能够马上看到全面信息。如果我们保留客户端代码(提交任务的代码)的堆栈跟踪并在出现故障的时候将其打印出来会怎么样?这不是什么新想法,例如Hazelcast会将当前点发生的异常传送回客户端代码,下面就看看保持客户端堆栈跟踪是怎样实现的:
public class ExecutorServiceWithClientTrace implements ExecutorService {
protected final ExecutorService target;
public ExecutorServiceWithClientTrace(ExecutorService target) {
this.target = target;
}
@Override
public Future submit(Callable task) {
return target.submit(wrap(task, clientTrace(), Thread.currentThread().getName()));
}
private Callable wrap(final Callable task, final Exception clientStack, String clientThreadName) {
return () -> {
try {
return task.call();
} catch (Exception e) {
log.error("Exception {} in task submitted from thrad {} here:", e, clientThreadName, clientStack);
throw e;
}
};
}
private Exception clientTrace() {
return new Exception("Client stack trace");
}
@Override
public List> invokeAll(Collection extends Callable> tasks) throws InterruptedException {
return tasks.stream().map(this::submit).collect(toList());
}
//...
}
这次一旦出现异常我们将检索任务被提交地方的所有堆栈跟踪和线程名,和标准异常相比下面的异常信息更有价值:
Exception java.lang.NullPointerException in task submitted from thrad main here:
java.lang.Exception: Client stack trace
at com.nurkiewicz.ExecutorServiceWithClientTrace.clientTrace(ExecutorServiceWithClientTrace.java:43) ~[classes/:na]
at com.nurkiewicz.ExecutorServiceWithClientTrace.submit(ExecutorServiceWithClientTrace.java:28) ~[classes/:na]
at com.nurkiewicz.Main.main(Main.java:31) ~[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0]
at java.lang.reflect.Method.invoke(Method.java:483) ~[na:1.8.0]
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) ~[idea_rt.jar:na]
9. Prefer CompletableFuture
Java 8提出了强大的CompletableFuture,请尽可能的使用它。ExecutorService并没有扩展支持这个强大的抽象,所以你要小心使用它。用:
final CompletableFuture future =
CompletableFuture.supplyAsync(this::calculate, executorService);
代替:
final Future future =
executorService.submit(this::calculate);
CompletableFuture继承了Future及其所有功能,而且CompletableFuture所提供的扩展功能极大地丰富了我们的API。
10. Synchronous queue
SynchronousQueue是一种有趣的BlockingQueue但真正意义上并不是queue,事实上它连数据结构都算不上。要解释的话它算是0容量的队列,引用JavaDoc:
each insert operation must wait for a corresponding remove operation by another thread, and vice versa. A synchronous queue does not have any internal capacity, not even a capacity of one. You cannot peek at a synchronous queue because an element is only present when you try to remove it; you cannot insert an element (using any method) unless another thread is trying to remove it; you cannot iterate as there is nothing to iterate. […]
Synchronous queues are similar to rendezvous channels used in CSP and Ada.
这和线程池有什么关系呢?试着在ThreadPoolExecutor中使用SynchronousQueue:
BlockingQueue queue = new SynchronousQueue<>();
ExecutorService executorService = new ThreadPoolExecutor(2, 2,
0L, TimeUnit.MILLISECONDS,
queue);
我们创造了有两个线程的线程池和一个SynchronousQueue,因为SynchronousQueue本质上是零容量的队列,因此如果有空闲线程,ExecutorService只会执行新的任务。如果所有的线程都被占用,新任务会被立刻拒绝不会等待。当进程背景要求立刻启动或者被丢弃时,这种机制是可取的。
以上,希望你们能够找到至少一个有用的!
Java的executorservice_ExecutorService-10个要诀和技巧相关推荐
- 学java的正确方法_学习Java编程 这10个技巧不容错过--中享思途
这是一个国外大神20多年的经验总结出来的-- "任何可能出错的事情,最后都会出错." 这就是人们为什么喜欢进行"防错性程序设计"的原因.偏执的习惯有时很有意义, ...
- Java 中代码优化的 30 个小技巧(中)
11 位运算效率更高 如果你读过 JDK 的源码,比如 ThreadLocal.HashMap 等类,你就会发现,它们的底层都用了位运算. 为什么开发 JDK 的大神们,都喜欢用位运算? 答:因为位运 ...
- Java 中代码优化的 30 个小技巧(下)
21 防止死循环 有些小伙伴看到这个标题,可能会感到有点意外,代码中不是应该避免死循环吗?为啥还是会产生死循环? 殊不知有些死循环是我们自己写的,例如下面这段代码: while(true) {if(c ...
- 一周极客热文:Java开发的10位牛人
1983年,Gosling获得了卡尔加里大学的计算机科学学士学位.1990年,他获得了卡内基梅隆大学的计算机科学博士学位,师从Bob Sproull.在攻读博士期间,他自己开发了一款emacs,叫Go ...
- 聊聊我们Java中代码优化的30个小技巧
今天我们一起聊聊Java中代码优化的30个小技巧,希望会对你有所帮助. 1.用String.format拼接字符串 不知道你有没有拼接过字符串,特别是那种有多个参数,字符串比较长的情况. 比如现在有个 ...
- Java 11手册:最聪明的技巧来简化Java 11导航
Java 11:提示和技巧,日常陷阱及更多 为了庆祝Java 11的发布,我们邀请了八位Java专家与他们分享最新版本的最佳和最差体验. 由于本系列旨在作为Java 11的手册,因此我们的受访者还将谈 ...
- Java中代码优化的30个小技巧
1.用String.format拼接字符串 String.format方法拼接url请求参数,日志打印等字符串. 但不建议在for循环中用它拼接字符串,因为它的执行效率,比使用+号拼接字符串,或者使用 ...
- Java 中代码优化的 30 个小技巧(上)
前言 今天我们一起聊聊 Java 中代码优化的 30 个小技巧,希望会对你有所帮助. 1 用 String.format 拼接字符串 不知道你有没有拼接过字符串,特别是那种有多个参数,字符串比较长的情 ...
- Java字符串的10大热点问题盘点
Java字符串的10大热点问题盘点 下面我为大家总结了10条Java开发者经常会提的关于Java字符串的问题,如果你也是Java初学者,仔细看看吧: 1.如何比较字符串,应该用"==&quo ...
- Java内存管理的9个小技巧
1.最基本的建议是尽早释放无用对象的引用.如: ... A a = new A(); //应用a对象 a = null; //当使用对象a之后主动将其设置为空 -. 注:如果a 是方法的返 ...
最新文章
- 三菱modbusRTU通讯实例_「笔记」信捷plc应用,两个plc通讯篇
- InheritParasitic.js
- 使用RDLC报表向报表传入参数
- oracle游标的说法,oracle游标练习题.doc
- linux内核的反复--一切都是过程
- 什么是MSTP(多业务传输平台)?
- 如何简单快速调试高大上的谷歌浏览器
- prism项目搭建 wpf_Prism 源码解读1-Bootstrapper和Region的创建
- oracle对象依赖关系图,Oracle concepts 学习笔记(4)——Schema对象间的依赖关系
- 28. JavaScript 库
- 油猴(Tampermonkey)安装教程
- android 动态获取权限
- (6)微信UI自动化-搜索指定联系人(C#)
- BUUCTF_Misc题目题解记录
- win7自动锁定计算机快捷键,两种方法教你锁定Win7系统电脑计算机快捷键
- 安装office时总得到“安装程序包的语言不受系统支持”的提示解决方法
- 推荐书籍:RNA甲基化表观转录组学
- 戴尔笔记本重装系统按f几进入
- 3dMax夜晚行车灯光轨迹一键生成插件TrafficTrails使用教程
- [HY000][1822] Failed to add the foreign key constraint. Missing index for constraint ‘fk_com’