Java_Java多线程_Java线程池核心参数 与 手动创建线程池
参考文章:
1.浅谈线程池ThreadPoolExecutor核心参数
https://www.cnblogs.com/stupid-chan/p/9991307.html
2.Java线程池 ThreadPoolExecutor(一)线程池的核心方法以及原理
https://blog.csdn.net/m0_37506254/article/details/90574038
3.Java 中的几种线程池,你之前用对了吗
https://www.cnblogs.com/fengzheng/p/9297602.html
4.线程池异常处理之重启线程处理任务
https://www.cnblogs.com/hapjin/p/10240863.html
整理下线程池的相关知识。阿里巴巴的规范是不允许使用Java提供的 Executors 返回的线程池,因为默认的线程池都存在一定的问题。本文主要从以下几个方面进行总结
1.默认线程池的问题
2.线程池的核心参数
3.线程池的相关问题
4.手动创建线程池
默认线程池的问题
如果使用 Executors 去创建线程池,使用阿里巴巴的插件会自动进行提示,
提示如下 :
说明 Java,默认提供的4种线程池创建方式都是不安全的。先看下默认的线程池创建方式的问题:
单线程线程池
ExecutorService singleThreadPool = Executors.newSingleThreadExecutor(Executors.defaultThreadFactory());
/*** Creates an Executor that uses a single worker thread operating* off an unbounded queue, and uses the provided ThreadFactory to* create a new thread when needed. Unlike the otherwise* equivalent {@code newFixedThreadPool(1, threadFactory)} the* returned executor is guaranteed not to be reconfigurable to use* additional threads.** @param threadFactory the factory to use when creating new* threads** @return the newly created single-threaded Executor* @throws NullPointerException if threadFactory is null*/public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory));}
再向下跟踪
/*** Creates a new {@code ThreadPoolExecutor} with the given initial* parameters and default rejected execution handler.** @param corePoolSize the number of threads to keep in the pool, even* if they are idle, unless {@code allowCoreThreadTimeOut} is set* @param maximumPoolSize the maximum number of threads to allow in the* pool* @param keepAliveTime when the number of threads is greater than* the core, this is the maximum time that excess idle threads* will wait for new tasks before terminating.* @param unit the time unit for the {@code keepAliveTime} argument* @param workQueue the queue to use for holding tasks before they are* executed. This queue will hold only the {@code Runnable}* tasks submitted by the {@code execute} method.* @param threadFactory the factory to use when the executor* creates a new thread* @throws IllegalArgumentException if one of the following holds:<br>* {@code corePoolSize < 0}<br>* {@code keepAliveTime < 0}<br>* {@code maximumPoolSize <= 0}<br>* {@code maximumPoolSize < corePoolSize}* @throws NullPointerException if {@code workQueue}* or {@code threadFactory} is null*/public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler);}
可以看到内部调用了线程池的核心创建方法,newSingleThreadExecutor 创建出来的单线程线程池 最主要的问题,是因为使用了 new LinkedBlockingQueue<Runnable>() 作为等待队列,该队列为无界队列,会导致堆积大量请求线程,从而导致OOM.
固定大小线程池
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10,Executors.defaultThreadFactory());
/*** Creates a thread pool that reuses a fixed number of threads* operating off a shared unbounded queue, using the provided* ThreadFactory to create new threads when needed. At any point,* at most {@code nThreads} threads will be active processing* tasks. If additional tasks are submitted when all threads are* active, they will wait in the queue until a thread is* available. If any thread terminates due to a failure during* execution prior to shutdown, a new one will take its place if* needed to execute subsequent tasks. The threads in the pool will* exist until it is explicitly {@link ExecutorService#shutdown* shutdown}.** @param nThreads the number of threads in the pool* @param threadFactory the factory to use when creating new threads* @return the newly created thread pool* @throws NullPointerException if threadFactory is null* @throws IllegalArgumentException if {@code nThreads <= 0}*/public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory);
也和单线程线程池一样的问题, 是因为使用了 new LinkedBlockingQueue<Runnable>() 作为等待队列,该队列为无界队列,会导致堆积大量请求线程,从而导致OOM.
缓存型线程池
ExecutorService cachedThreadPool = Executors.newCachedThreadPool(Executors.defaultThreadFactory());
向下跟踪
/*** Creates a thread pool that creates new threads as needed, but* will reuse previously constructed threads when they are* available, and uses the provided* ThreadFactory to create new threads when needed.* @param threadFactory the factory to use when creating new threads* @return the newly created thread pool* @throws NullPointerException if threadFactory is null*/
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),threadFactory);
线程池的最大线程大小 max 为 Integer 上限,会创建大量的等待线程,从而引发OOM
延迟执行线程池
public void scheduleThreadPool() throws Exception{ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10, Executors.defaultThreadFactory());scheduledExecutorService.schedule(new Runnable() {@Overridepublic void run() {System.out.println("666" + new Date());}}, 4, TimeUnit.SECONDS);scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {@Overridepublic void run() {System.out.println("777" + new Date());}}, 1, 4, TimeUnit.SECONDS);Thread.sleep(1000 * 60);scheduledExecutorService.shutdown(); }
向下跟踪代码:
/*** Creates a thread pool that can schedule commands to run after a* given delay, or to execute periodically.* @param corePoolSize the number of threads to keep in the pool,* even if they are idle* @param threadFactory the factory to use when the executor* creates a new thread* @return a newly created scheduled thread pool* @throws IllegalArgumentException if {@code corePoolSize < 0}* @throws NullPointerException if threadFactory is null*/public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);}
跟踪 ScheduledThreadPoolExecutor(corePoolSize, threadFactory); 构造方法:
/*** Creates a new {@code ScheduledThreadPoolExecutor} with the* given core pool size.** @param corePoolSize the number of threads to keep in the pool, even* if they are idle, unless {@code allowCoreThreadTimeOut} is set* @throws IllegalArgumentException if {@code corePoolSize < 0}*/public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());}
最后看到 线程池的最大线程大小 max 为 Integer 上限,会创建大量的等待线程,从而引发OOM
结论:
说明默认的4种线程池都多多少少存在问题 !!
============================
线程池的核心参数
看到上面的默认线程池都用到了 ThreadPoolExecutor 这个类,这个类也是手动创建线程的核心类
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),threadFactory);
看下最后的最终构造函数:
/*** Creates a new {@code ThreadPoolExecutor} with the given initial* parameters.** @param corePoolSize the number of threads to keep in the pool, even* if they are idle, unless {@code allowCoreThreadTimeOut} is set* @param maximumPoolSize the maximum number of threads to allow in the* pool* @param keepAliveTime when the number of threads is greater than* the core, this is the maximum time that excess idle threads* will wait for new tasks before terminating.* @param unit the time unit for the {@code keepAliveTime} argument* @param workQueue the queue to use for holding tasks before they are* executed. This queue will hold only the {@code Runnable}* tasks submitted by the {@code execute} method.* @param threadFactory the factory to use when the executor* creates a new thread* @param handler the handler to use when execution is blocked* because the thread bounds and queue capacities are reached* @throws IllegalArgumentException if one of the following holds:<br>* {@code corePoolSize < 0}<br>* {@code keepAliveTime < 0}<br>* {@code maximumPoolSize <= 0}<br>* {@code maximumPoolSize < corePoolSize}* @throws NullPointerException if {@code workQueue}* or {@code threadFactory} or {@code handler} is null*/public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}
线程池的核心参数为以下7个
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler
下面一一进行解释
int corePoolSize
核心线程数,当有任务进来的时候,如果当前线程数还未达到 corePoolSize 个数,则创建核心线。
默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,
核心线程有几个特点:
1、当线程数未达到核心线程最大值的时候,新任务进来,即使有空闲线程,也不会复用,仍然新建核心线程;
2、核心线程一般不会被销毁,即使是空闲的状态,但是如果通过方法 allowCoreThreadTimeOut(boolean value) 设置为 true 时,超时也同样会被销毁;
3、生产环境首次初始化的时候,可以调用 prestartCoreThread() / prestartAllCoreThreads() 方法 ,来预先创建所有核心线程,避免第一次调用缓慢;
int maximumPoolSize
除了有核心线程外,有些策略是当核心线程占满(无空闲)的时候,还会创建一些临时的线程来处理任务,maximumPoolSize 就是核心线程 + 临时线程的最大上限。临时线程有一个超时机制,超过了设置的空闲时间没有事儿干,就会被销毁
long keepAliveTime
表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。
但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
TimeUnit unit
参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小时
TimeUnit.MINUTES; //分钟
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //纳秒
BlockingQueue<Runnable> workQueue
一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响。
队列分为有界队列和无界队列。
有界队列:队列的长度有上限,当核心线程满载的时候,新任务进来进入队列,当达到上限,有没有核心线程去即时取走处理,这个时候,就会创建临时线程。(警惕临时线程无限增加的风险)
无界队列:队列没有上限的,当没有核心线程空闲的时候,新来的任务可以无止境的向队列中添加,而永远也不会创建临时线程。(警惕任务队列无限堆积的风险)
除此之外,这里的阻塞队列有以下几种选择:
1、ArrayBlockingQueue:基于数组的先进先出,创建时必须指定大小,超出直接corePoolSize个任务,则加入到该队列中,只能加该queue设置的大小,其余的任务则创建线程,直到(corePoolSize+新建线程)> maximumPoolSize。
2、LinkedBlockingQueue:基于链表的先进先出,无界队列。超出直接corePoolSize个任务,则加入到该队列中,直到资源耗尽。
3、SynchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue 和 Synchronous。线程池的排队策略与BlockingQueue有关。
ThreadFactory threadFactory
它是一个接口,用于实现生成线程的方式、定义线程名格式、是否后台执行等等.
可以用 Executors.defaultThreadFactory() 默认的实现即可,
也可以用 Guava 等三方库提供的方法实现,
如果有特殊要求的话可以自己定义。它最重要的地方应该就是定义线程名称的格式,便于排查问题了吧
RejectedExecutionHandler handler
当没有空闲的线程处理任务,并且等待队列已满(当然这只对有界队列有效),再有新任务进来的话,就要做一些取舍了,而这个参数就是指定取舍策略的,有下面四种策略可以选择:
ThreadPoolExecutor.AbortPolicy:直接抛出异常 RejectedExecutionException ,这是默认策略;
ThreadPoolExecutor.DiscardPolicy:直接丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后将新来的任务加入等待队列
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务,并提供一种简单的反馈机制,可以有效防止新任务的提交。比如在 main 函数中提交线程,如果执行此策略,将有 main 线程来执行该任务
ThreadPoolExecutor.AbortPolicy:直接抛出异常 RejectedExecutionException ,这是默认策略; Java 提供的4种默认实现的线程池都是使用的这种策略。
线程池的相关问题
线程池相关方法
线程池也提供了一些相关的方法,大致如下:
execute()
submit()
shutdown()
shutdownNow()
还有很多其他的方法:
比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等获取与线程池相关属性的方法,有兴趣的朋友可以自行查阅API。
execute()
execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。
submit()
submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果(Future相关内容将在下一篇讲述)。
shutdown()
shutdown() 提供一种有序的关机,会等待当前缓存队列任务全部执行完成才会关闭,但不会再接收新的任务(相对较优雅)。
shutdownNow()
shutdownNow() 会立即关闭线程池,会打断正在执行的任务并且会清空缓存队列中的任务,返回的是尚未执行的任务。
corePoolSize与maximumPoolSize关系
1、池中线程数小于corePoolSize,新任务都不排队而是直接添加新线程
2、池中线程数大于等于corePoolSize,workQueue未满,首选将新任务加入workQueue而不是添加新线程
3、池中线程数大于等于corePoolSize,workQueue已满,但是线程数小于maximumPoolSize,添加新的线程来处理被添加的任务
4、池中线程数大于大于corePoolSize,workQueue已满,并且线程数大于等于maximumPoolSize,新任务被拒绝,使用handler处理被拒绝的任务
手动创建线程池
下面演示下如何手动创建线程池:
这里我们使用的 Guava 的 ThreadFactory, 相关的 pom
<dependencies><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>14.0.1</version></dependency> </dependencies>
线程池创建代码
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("thread-call-runner-%d").build();ExecutorService taskExe = new ThreadPoolExecutor(1, 1, 200L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
创建了1个线程池 coreSize 1, maxSize 1, 使用有界等待队列初始大小为1, 传递Guava 创建的线程工厂(主要是为了给线程命名), 拒绝策略为直接抛出异常
测试代码
package thread.pool;import com.google.common.util.concurrent.ThreadFactoryBuilder;import java.util.concurrent.*;/*** Created by szh on 2020/6/8.*/
public class ThreadPoolManual {public static int i = 1;public static volatile boolean flag = false;public static void main(String[] args) {ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("thread-call-runner-%d").build();ExecutorService taskExe = new ThreadPoolExecutor(1, 1, 200L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());taskExe.submit(new Thread(() -> {while (ThreadPoolManual.i <= 99) {if (ThreadPoolManual.flag == false) {System.out.println(Thread.currentThread().getName() + " " + i);ThreadPoolManual.i++;ThreadPoolManual.flag = true;}}}));taskExe.submit(new Thread(() -> {while (ThreadPoolManual.i <= 100) {if (ThreadPoolManual.flag == true) {System.out.println(Thread.currentThread().getName() + " " + i);ThreadPoolManual.i++;ThreadPoolManual.flag = false;}}}));taskExe.submit(new Runnable() {@Overridepublic void run() {System.out.println("xxxxx");}});}}
分析
总共3个线程,2个线程 交替打印 0~ 100, 格外并提交了1个线程用来干扰,
因为线程池当前运行一个线程1,另一个线程处于等待队列,第3个线程触发了拒绝策略。
输出
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@5b480cf9 rejected from java.util.concurrent.ThreadPoolExecutor@6f496d9f[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0]at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)at thread.pool.ThreadPoolManual.main(ThreadPoolManual.java:42)
thread-call-runner-0 1
Java_Java多线程_Java线程池核心参数 与 手动创建线程池相关推荐
- 我会手动创建线程,为什么让我使用线程池?
你有一个思想,我有一个思想,我们交换后,一个人就有两个思想 If you can NOT explain it simply, you do NOT understand it well enough ...
- Java多线程学习六:使用线程池比手动创建线程好在那里以及常用线程池参数的意义
为什么要使用线程池 首先,回顾线程池的相关知识,在 Java 诞生之初是没有线程池的概念的,而是先有线程,随着线程数的不断增加,人们发现需要一个专门的类来管理它们,于是才诞生了线程池.没有线程池的时候 ...
- 【重难点】【JUC 05】线程池核心设计与实现、线程池使用了什么设计模式、要你设计的话,如何实现一个线程池
[重难点][JUC 05]线程池核心设计与实现.线程池使用了什么设计模式.要你设计的话,如何实现一个线程池 文章目录 [重难点][JUC 05]线程池核心设计与实现.线程池使用了什么设计模式.要你设计 ...
- JDBC连接池核心参数包括?(多选题)
30.JDBC连接池核心参数包括?(多选题) A. initialSize B. maxIdle C. minIdle D. maxActive E. maxWait 正确答案是:ABCDE 数据库基 ...
- 手把手教你手动创建线程池
点击上方 好好学java ,选择 星标 公众号 重磅资讯.干货,第一时间送达 今日推荐:2020,搞个 Mac 玩玩!个人原创+1博客:点击前往,查看更多 作者:IamHYN 链接:https://s ...
- 阿里代码规约:手动创建线程池,效果会更好哦
项目中创建多线程时,使用常见的三种线程池创建方式,单一.可变.定长都有一定问题,原因是FixedThreadPool和SingleThreadExecutor底层都是用LinkedBlockingQu ...
- 手动创建线程池,效果会更好哦
今天在回顾线程池的创建时,使用Executors创建线程池报错了,出现了以下问题:手动创建线程池,效果会更好哦. 查阅了阿里巴巴Java开发手册 回顾一下,通过ThreadPoolExecutor来创 ...
- 手动创建线程池 效果会更好_创建更好的,可访问的焦点效果
手动创建线程池 效果会更好 Most browsers has their own default, outline style for the :focus psuedo-class. 大多数浏览器 ...
- 阿里巴巴提示:手动创建线程效果更好
原来创建方式 ExecutorService executorService = Executors.newFixedThreadPool(threadNum); 阿里的插件提示:手动创建线程效果好 ...
最新文章
- SAP Retail 事务代码WSOA1创建Assortment不能选Assortment Category !
- oracle12c多个pdb,Oracle 12c 多租户专题|12cR2中PDB内存资源管理
- Sublime Text3 如何安装、删除及更新插件
- 1043:整数大小比较
- HDU 5938 Four Operations 【字符串处理,枚举,把数字字符串变为数值】
- linux 空间不够了,怎么办?Disk Requirements:At least 11MB more space needed on the / filesystem....
- 根据wsdl文件生成WebService客户端代码
- php socket邮箱,phpsocket.io php版本的socket.io
- 西门子滚筒洗衣机教程_西门子洗衣机优缺点
- PostgreSQL使用pgAdmin3不能编辑表里的数据
- 调用百度语音合成API,Qt实现语音合成,Qt语音合成
- 【原创】彻底解决2440触摸屏跳点以及抖动问题
- Python学习笔记——工欲善其事,必先利其器
- PageHelper处理分页问题,total总数不对
- ptp精准时间协议_PTP高精度时间同步协议
- 华为短信开发包开发联通sgip1.2接口协议报错
- 没想到,还有小白不知道怎么比较数组是否相等以及检出不匹配项
- Java 蜡烛图_7-13 日K蜡烛图 - osc_9vrg5zhs的个人空间 - OSCHINA - 中文开源技术交流社区...
- 通过网页来打开app指定页面
- es6相关面试题:1.rest参数;2.new.target;3.object.defineProperty与Proxy的区别;4.Reflect对象作用;5.lterator迭代器;6.async
热门文章
- 【IDEA】IntelliJ IDEA代码特效插件-屏幕抖动和颗粒效果
- vba九九乘法表代码_用五种方法利用EXCEL制作九九乘法表
- 入驻华为云·云享专家了~
- C#引用interop.taskscheduler.dll
- 《浪潮之巅》读者热评
- 吴恩达机器学习(十五)—— 应用实例:图片文字识别
- dsoframer-在线编辑office文档,一款开源的由微软提供
- 墨者学院01 SQL手工注入漏洞测试(MySQL数据库)
- Unirech:阿里云国际云服务器ecs建站流程
- 礼盒抖动动画(CocosCreator)