下面是最常见的线程池的使用和声明方式:

public class ThreadTest {ExecutorService fixedThreadPool = Executors.newFixedThreadPool(50);public void dothing(){for(int i=0;i<50;i++){fixedThreadPool.execute(new Runnable() {@Overridepublic void run() {//do something}});}fixedThreadPool.shutdown();//关闭线程池//此处不可以删除或注释,需要线程执行结束后再执行别的内容,即只有线程结束后才会继续向下执行while (!fixedThreadPool.isTerminated()) {}}
}

上面的声明方式存在下面几个问题:

1)fixedThreadPool为对象变量,每个对象都会产生新的线程池,这就意味着虽然声明的是fixedThreadPool,但实际上线程的数量是不可控的,因为可能同时产生多个对象,每个对象分别有一个线程池,所以就有可能出现多个线程池,虽然每个线程自身占用的内存有限,但是每个线程执行过程中产生的对象所占用的内存却是不可控的,可能多达几十甚至上百兆。此时则会消耗大量内存,报内存溢出错误

2)另外,建立线程池的目的之一就是因为线程的重建和销毁会消耗时间和空间,线程池可以实现线程的重复利用,节约线程的销毁和重建成本,但是上面的最后两行代码,会在任务执行完后就关闭线程池,并销毁线程,并没能充分利用线程池的优势。

3)使用executors创建线程池有四个,single、fixed、catched、secheduled,这四种线程池都存在可能导致OOM问题,single和fixed虽然限制了线程个数,但是允许线程等待的队列最大个数却是Integer.MAX_VALUE,可能会堆积大量的请求,导致OOM;catched和secheduled方式,直接线程个数就是Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

4)上面run方法里面的内容每有进行try catch捕获异常,那么如果run方法中出现了异常,就会直接导致线程终止,每次都需要重建线程,等于线程池没有发挥作用,高并发下,可能会导致CPU和内存占用飙升,甚至jvm挂掉。解决方案就是run方法内部try catch

如果确定你的并发量有限,并且每个线程占用的内存大小有限,你可以使用Executors来建立线程池,

所以使用Executors建立线程池的正确使用方式,针对问题1和2,应该将线程池声明为静态变量,如下:

    public static ExecutorService fixedThreadPool = Executors.newFixedThreadPool(50);

此时,fixedThreadPool是类变量,由所有的ThreadTest类的对象共享,注意此时不可以再有如下代码:

        fixedThreadPool.shutdown();//关闭线程池//此处不可以删除或注释,需要线程执行结束后再执行别的内容,即只有线程结束后才会继续向下执行        while(!fixedThreadPool.isTerminated()) { }

即线程池不能关闭,如果关闭了会报如下异常:

java.util.concurrent.RejectedExecutionException。

不能shutdown,笔者在实际场景遇到了一个问题:

上面的线程池实际是向数据库中插入数据,执行完后需要在页面展示数据坤插入的数据,因为无法使用shutdown和isTerminated方法,导致查的时候数据还没有插入。解决的方案很简单,让主线程睡眠两秒,即

Thread.currentThread().sleep(2000);

。两秒足够线程池中的任务执行完毕。

如果你的并发量没有办法控制,并且每个线程占用的内存大小无法确定较小,那么你需要使用ThreadPoolExecutor的方式来创建线程。

其构造函数签名如下:

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);

参数介绍:

corePoolSize 核心线程数,指保留的线程池大小(不超过maximumPoolSize值时,线程池中最多有corePoolSize 个线程工作)。

maximumPoolSize 指的是线程池的最大大小(线程池中最大有corePoolSize 个线程可运行)

keepAliveTime 指的是空闲线程结束的超时时间(当一个线程不工作时,过keepAliveTime 长时间将停止该线程)。

unit 是一个枚举,表示 keepAliveTime 的单位(有NANOSECONDS, MICROSECONDS,

MILLISECONDS, SECONDS, MINUTES, HOURS, DAYS,7个可选值)。

workQueue 表示存放任务的队列(存放需要被线程池执行的线程队列)。

threadFactory - 执行程序创建新线程时使用的工厂。

handler - 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。

参数之间关系如下:

第一类:控制被处理线程:corePoolSize,maximumPoolSize,workQueue会决定你的线程处理顺序,首先进来线程会将该线程池的实际线程数变为corePoolSize,达到corePoolsize后,再进来的线程就会进入workQueue来排队;如果workQueue满了,再进来的线程就会继续创建线程,直到实际个数达到maximumPoolSize;

无论是对列里面的排队线程,还是实际正在处理的线程,都会占用内存,所以需要根据实际每个执行线程占用内存的大小合理corePoolSIze,maximumPoolSize,和workQueue。

池中线程数大小的选择策略:
如果纯计算的任务,多线程并不能带来性能提升,因为CPU处理能力是稀缺的资源,相反导致较多的线程切换的花销,此时建议线程数为CPU数量或加一;
如果任务包含大量IO/网络等待等,线程数 = CPU 核数 × 目标 CPU 利用率 ×(1 + 平均等待时间 / 平均工作时间)

而关于workQueue有下面三种方式:

1、直接提交。工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes, 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

2、无界队列。使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

3、有界队列。当使用有限的 maximumPoolSizes时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开 销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。

第二类:RejectedExecutionHandler:无法处理线程的handler

RejectedExecutionHandler接口提供了对于拒绝任务的处理的自定方法的机会。在ThreadPoolExecutor中已经默认包含了4中策略,因为源码非常简单,这里直接贴出来。

CallerRunsPolicy(直接运行run方法):线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。

1.     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

2.                 if (!e.isShutdown()) {

3.                     r.run();

4.                 }

5.             }

这个策略显然不想放弃执行任务。但是由于池中已经没有任何资源了,那么就直接使用调用线程本身run方法来执行。

AbortPolicy(抛异常):处理程序遭到拒绝将抛出运行时RejectedExecutionException

1.     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

2.                 throw new RejectedExecutionException();

3.             }

这种策略直接抛出异常,丢弃任务。

DiscardPolicy(直接丢弃):不能执行的任务将被删除

1.     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

2.             }

这种策略和AbortPolicy几乎一样,也是丢弃任务,只不过他不抛出异常。

DiscardOldestPolicy(抛弃最老的线程):如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后

重试执行程序(如果再次失败,则重复此过程)

1.     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

2.                 if (!e.isShutdown()) {

3.                     e.getQueue().poll();

4.                     e.execute(r);

5.                 }

}

该策略就稍微复杂一些,在pool没有关闭的前提下首先丢掉缓存在队列中的最早的任务,然后重新尝试运行该任务。这个策略需要适当小心。

设想:如果其他线程都还在运行,那么新来任务踢掉旧任务,缓存在queue中,再来一个任务又会踢掉queue中最老任务

当然,还有别的使用ExecutorService+FutureTask+Callable方式应该也可以,可参考这里。

ThreadFactory的使用

当一个线程由于未捕获异常而退出时,JVM会把这个事件报告给应用程序提供的UncaughtExceptionHandler异常处理器(这是Thread类中的接口)。所以我们可以在创建线程的时候声明异常处理器。实例如下:
public class MyUnchecckedExceptionhandler implements UncaughtExceptionHandler {
    @Override
    public void uncaughtException(Thread t, Throwable e) {
        System.out.println("捕获异常处理方法:" + e);
    }
}
ExecutorService exec = Executors.newCachedThreadPool(new ThreadFactory(){
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setUncaughtExceptionHandler(new MyUnchecckedExceptionhandler());
                return thread;
            }
});

线程池的源码

execute在哪里启动额线程?
怎么调度的 ?线程将任务执行完毕怎么加另一个任务?
第一次创建线程(Worker) 的时候,调用start方法开启线程,然后worker的run方法会启动传入的runnable的fun方法;
然后不断判断当前runnable是否空,空说明执行完,获取新runnable的直接执行run方法

线程池中线程如何复用的?

worker,大的while循环,不断到队列里面获取,如果有任务执行任务,否则sleep。sleep好了在执行任务或sleep

https://blog.csdn.net/anhenzhufeng/article/details/88870374

-----------------------------------------------------------------------------------------------------------------------

更新与2021年11月,最佳实践如下:

1 根据机器资源创建线程池

public class StuClassInfoThreadExecutor {/*** 线程池名称*/private static final String STU_CLASS_INFO_THREAD_FACTORY_NAME = "stu-class-info-thread-pool";/*** cpu可用核数*/private static final int DEFAULT_CPU_PROCESSORS = Runtime.getRuntime().availableProcessors();/*** spring支持的线程池任务包装类*/private static final ThreadPoolTaskExecutor STU_CLASS_INFO_THREAD_EXECUTOR = new ThreadPoolTaskExecutor();/*** 默认线程池*/private static final ThreadFactory DEFAULT_THREAD_FACTORY = new DefaultThreadFactory(STU_CLASS_INFO_THREAD_FACTORY_NAME);static {STU_CLASS_INFO_THREAD_EXECUTOR.setThreadFactory(DEFAULT_THREAD_FACTORY);//此线程池负责业务属于IO密集型,设置核心线程数为cpu核数*2STU_CLASS_INFO_THREAD_EXECUTOR.setCorePoolSize(DEFAULT_CPU_PROCESSORS * 5);STU_CLASS_INFO_THREAD_EXECUTOR.setMaxPoolSize(DEFAULT_CPU_PROCESSORS * 25);STU_CLASS_INFO_THREAD_EXECUTOR.setQueueCapacity(20);//默认值就是60seconds,显示声明的目的是直观看到线程池中线程的存活时间STU_CLASS_INFO_THREAD_EXECUTOR.setKeepAliveSeconds(60);//CallerRunsPolicy这个拒绝策略代表的是,如果工作列队满了(超过QueueCapacity)同时MaxPoolSize也达到了阀值,那么当前任务使用主线程调用STU_CLASS_INFO_THREAD_EXECUTOR.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//核心线程如果超过了keepTimeOut时间也会进行回收,设置这个参数的原因是因为其他maven-module可能不需要依赖这个线程池,从而造成线程资源的浪费
//        STU_CLASS_INFO_THREAD_EXECUTOR.setAllowCoreThreadTimeOut(true);STU_CLASS_INFO_THREAD_EXECUTOR.initialize();}/*** 获取线程池任务实例** @return ThreadPoolTaskExecutor*/public static ThreadPoolTaskExecutor getInstance() {return STU_CLASS_INFO_THREAD_EXECUTOR;}}

使用

private List<ClassByStudentDto> getStudentClassesEnhanced(List<RegistCoreDto> registerDtoList, String cityCode){CompletableFuture<ListMultimap<String, ClassByStudentDto>> classIdClassDtoMultimap =CompletableFuture.supplyAsync(() -> this.fetchClassDetails(registerDtoList, cityCode), StuClassInfoThreadExecutor.getInstance());// 因为需要线程中的异常做降级(CourseBiz.getStudentClasses),所以这里取消原本的异步,使用主线程执行
//        CompletableFuture<Map<String, List<StudentLessonDto>>> registerIdToLessons =
//                CompletableFuture.supplyAsync(() -> this.getAllRegisterLessons(registerDtoList, cityCode), StuClassInfoThreadExecutor.getInstance());Map<String, List<StudentLessonDto>> registerIdToLessons = this.getAllRegisterLessons(registerDtoList, cityCode);//        CompletableFuture<List<ClassByStudentDto>> results = classIdClassDtoMultimap
//                .thenCombineAsync(registerIdToLessons, (classDtoMultimap, registerLessons) ->
//                        this.getClassByStudentDtos(cityCode, classDtoMultimap, registerLessons), StuClassInfoThreadExecutor.getInstance());CompletableFuture<List<ClassByStudentDto>> results = classIdClassDtoMultimap.thenApplyAsync( (classDtoMultimap) ->this.getClassByStudentDtos(cityCode, classDtoMultimap, registerIdToLessons), StuClassInfoThreadExecutor.getInstance());return results.exceptionally(e -> {alarmService.reportException(e);return List.of();}).join();}

CompletableFuture执行任务

在使用FutureTask来完成异步任务,通过get()方法获取结果时,会让获取结果的线程进入阻塞等待,这种方式并不是最理想的状态。

JDK8中引入了CompletableFuture,对Future进行了改进,可以在定义CompletableFuture传入回调对象,任务在完成或者异常时,自动回调。

public class CompletableFutureDemo {public static void main(String[] args) throws InterruptedException {// 创建CompletableFuture时传入Supplier对象CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new MySupplier());//执行成功时future.thenAccept(new MyConsumer());// 执行异常时future.exceptionally(new MyFunction());// 主任务可以继续处理,不用等任务执行完毕System.out.println("主线程继续执行");Thread.sleep(5000);System.out.println("主线程执行结束");}
}class MySupplier implements Supplier<Integer> {@Overridepublic Integer get() {try {// 任务睡眠3sTimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}return 3 + 2;}
}
// 任务执行完成时回调Consumer对象
class MyConsumer implements Consumer<Integer> {@Overridepublic void accept(Integer integer) {System.out.println("执行结果" + integer);}
}
// 任务执行异常时回调Function对象
class MyFunction implements Function<Throwable, Integer> {@Overridepublic Integer apply(Throwable type) {System.out.println("执行异常" + type);return 0;}
}
复制代码

以上代码可以通过lambda表达式进行简化。

public class CompletableFutureDemo {public static void main(String[] args) throws InterruptedException {CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {try {// 任务睡眠3sTimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}return 3 + 2;});//执行成功时future.thenAccept((x) -> {System.out.println("执行结果" + x);});future.exceptionally((type) -> {System.out.println("执行异常" + type);return 0;});System.out.println("主线程继续执行");Thread.sleep(5000);System.out.println("主线程执行结束");}
}
复制代码

通过示例我们发现CompletableFuture的优点:

  • 异步任务结束时,会自动回调某个对象的方法;
  • 异步任务出错时,会自动回调某个对象的方法;
  • 主线程设置好回调后,不再关心异步任务的执行。

当然这些优点还不足以体现CompletableFuture的强大,还有更厉害的功能。

串行执行

多个CompletableFuture可以串行执行,如第一个任务先进行查询,第二个任务再进行更新

public class CompletableFutureDemo {public static void main(String[] args) throws InterruptedException {// 第一个任务CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1234);// 第二个任务CompletableFuture<Integer> secondFuture = future.thenApplyAsync((num) -> {System.out.println("num:" + num);return num + 100;});secondFuture.thenAccept(System.out::println);System.out.println("主线程继续执行");Thread.sleep(5000);System.out.println("主线程执行结束");}
}
复制代码

并行任务

CompletableFuture除了可以串行,还支持并行处理。

public class CompletableFutureDemo {public static void main(String[] args) throws InterruptedException {// 第一个任务CompletableFuture<Integer> oneFuture = CompletableFuture.supplyAsync(() -> 1234);// 第二个任务CompletableFuture<Integer> twoFuture = CompletableFuture.supplyAsync(() -> 5678);// 通过anyOf将两个任务合并为一个并行任务CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(oneFuture, twoFuture);anyFuture.thenAccept(System.out::println);System.out.println("主线程继续执行");Thread.sleep(5000);System.out.println("主线程执行结束");}
}
复制代码

通过anyOf()可以实现多个任务只有一个成功,CompletableFuture还有一个allOf()方法实现了多个任务必须都成功之后的合并任务。
作者:小黑说Java
链接:https://juejin.cn/post/7008318996827078693
来源:稀土掘金

更多关于completableFuture的介绍

如何编写优雅的异步代码 — CompletableFuture - 掘金

CompletableFuture 使用详解 - 简书

												

java线程池的正确使用方式,completableFuture相关推荐

  1. java线程池和线程实例化_浅谈Java 线程池原理及使用方式

    一.简介 什么是线程池? 池的概念大家也许都有所听闻,池就是相当于一个容器,里面有许许多多的东西你可以即拿即用.java中有线程池.连接池等等.线程池就是在系统启动或者实例化池时创建一些空闲的线程,等 ...

  2. java线程池4种使用方式

    Java通过Executors提供四种线程池,分别为: newCachedThreadPool 可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程. newFixed ...

  3. java线程池存在时间_Java线程池基础

    目录: 一.线程池概述 1.线程池类 目前线程池类一般有两个,一个来自于Spring,一个来自于JDK: 来自Spring的线程池:org.springframework.scheduling.con ...

  4. java线程池的应用_Java线程池的使用

    Java并发编程:线程池的使用 在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了, ...

  5. Java线程池「异常处理」正确姿势:有病就得治

    假设我们有一个线程池,由于程序需要,我们向该线程池中提交了好多好多任务,但是 这些任务都没有对异常进行try catch处理,并且运行的时候都抛出了异常 .这会对线程池的运行带来什么影响? 正确答案是 ...

  6. java线程池shutdown_关闭线程池的正确姿势,shutdown(), shutdownNow()和awaitTermination() 该怎么用?...

    关闭线程池的正确姿势,shutdown(), shutdownNow()和awaitTermination() 该怎么用? ExecutorService 接口提供了三个方法用于手动关闭线程池,分别是 ...

  7. Java多线程系列(三):Java线程池的使用方式,及核心运行原理

    之前谈过多线程相关的4种常用Java线程锁的特点,性能比较.使用场景,今天主要分享线程池相关的内容,这些都是属于Java面试的必考点. 为什么需要线程池 java中为了提高并发度,可以使用多线程共同执 ...

  8. Java线程池的四种创建方式

    Java线程池的四种创建方式 Java使用Thread类来表示线程,所有的线程都是Thread类或者是他的子类.Java有四种方式来创建线程. (1)继承Thread类创建线程 (2)实现Runnab ...

  9. java创建线程池几种方式_java知识总结-创建线程池的6种方式

    一.创建线程池的6种方式: Executors.newCachedThreadPool(); 创建一个可缓存线程池,应用中存在的线程数可以无限大 Executors.newFixedThreadPoo ...

最新文章

  1. 如何安装部署salt yum?
  2. VMware宣布完成27亿美元收购Pivotal;日本成功研发出6G芯片:单载波速度高达100Gbps;联想手机再换新掌门……...
  3. mysql查询索引位置_mysql索引在什么位置
  4. 【转】ASPxGridView 日期范围过滤扩展
  5. 【codevs3372】选学霸,并查集+可达性DP
  6. C ++ 指针 | 指针与字符_4
  7. Matlab并行编程cellfun arrayfun
  8. centos-rpm
  9. 超燃:2019 中国.NET 开发者峰会视频发布
  10. [ubuntu]Ubuntu查看cpu温度
  11. halcon代码LAWS纹理滤波
  12. 清理谷歌浏览器注册表_Win10系统下注册表chrome残留无法删除
  13. 电脑系统或者office系列使用小马激活工具,电脑开机桌面出现假的Edge浏览器
  14. idea2018破解有效期至2100年
  15. HTML中加入背景音乐
  16. 生成排列(全排列)的两种写法
  17. 【实习日记】Linux-VM15-Ubuntu18.04 + 运行selenium实现文件下载
  18. 区块链DAPP开发 以太坊智能合约框架有哪些
  19. ROS之msg文件定义以及自定义发布主题消息类型
  20. 计算机组成原理 微机,【2017年整理】计算机组成原理-微机实验指导书.doc

热门文章

  1. Unity角色同时播放两个音效(走路音效+说话音效)
  2. arm linux 加密锁,ET ARM 标准锁(包含网络锁功能)
  3. Nmap入门:隐私刺探
  4. 郑志远的java学习笔记
  5. Android之动画全讲-刘志远-专题视频课程
  6. Matlab基础编程知识处理(2)(数学建模中模型的模拟与数据提取,本篇全干货)
  7. XMD couldn't connect to remote target
  8. 基于Linux的tty架构及UART驱动详解
  9. dB单位理解,声音,天线,功率
  10. 牛客练习赛56 A-小蒟和他的乐谱