Java线程池ThreadPool详解

  • 1. 线程池概述
    • 1.1 线程池简介
    • 1.2 线程池特点
    • 1.3 线程池解决问题
  • 2. 线程池原理分析
    • 2.1 线程池总体设计
    • 2.6 线程池流转状态
    • 2.2 线程池执行流程
    • 2.3 线程池核心参数
    • 2.4 线程池饱和策略
    • 2.5 线程池阻塞队列
    • 2.7 线程池关闭方式
    • 2.8 线程池数量配置
  • 3. 线程池工具类
  • 4. Executors创建线程池注意事项
  • 5. 线程池手动创建
  • 6. SpringBoot创建线程池

1. 线程池概述

1.1 线程池简介

线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。

线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。

1.2 线程池特点

1.降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。

2.提高响应速度:任务到达时,无需等待线程创建即可立即执行。

3.提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。

4.提供其他功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。

1.3 线程池解决问题

线程池解决的核心问题就是资源管理问题。在并发环境下,系统不能够确定在任意时刻中,有多少任务需要执行,有多少资源需要投入。这种不确定性将带来以下若干问题:

1.频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。
2.对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险。3.系统无法合理管理内部的资源分布,会降低系统的稳定性。

为解决资源分配这个问题,线程池采用了“池化”(Pooling)思想。池化,顾名思义,是为了最大化收益并最小化风险,而将资源统一在一起管理的一种思想。

2. 线程池原理分析

2.1 线程池总体设计

ThreadPoolExecutor是Java中的线程池核心实现类,以下是ThreadPoolExecutor的UML类图。
ThreadPoolExecutor实现的顶层接口是Executor,顶层接口Executor提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分。

ExecutorService接口增加了一些能力:
(1)扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;
(2)提供了管控线程池的方法,比如停止线程池的运行。

2.6 线程池流转状态

线程池运行的状态,并不是用户显式设置的,而是伴随着线程池的运行,由内部来维护。线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量 (workerCount)。在具体实现中,线程池将运行状态(runState)、线程数量 (workerCount)两个关键参数的维护放在了一起,如下代码所示:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl这个AtomicInteger类型,是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它同时包含两部分的信息:线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),高3位保存runState,低29位保存workerCount,两个变量之间互不干扰。用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。通过阅读线程池源代码也可以发现,经常出现要同时判断线程池运行状态和线程数量的情况。线程池也提供了若干方法去供用户获得线程池当前的运行状态、线程个数。这里都使用的是位运算的方式,相比于基本运算,速度也会快很多。

关于内部封装的获取生命周期状态、获取线程池线程数量的计算方法如以下代码所示:

private static int runStateOf(int c)     { return c & ~CAPACITY; } //计算当前运行状态
private static int workerCountOf(int c)  { return c & CAPACITY; }  //计算当前线程数量
private static int ctlOf(int rs, int wc) { return rs | wc; }   //通过状态和线程数生成ctl

ThreadPoolExecutor的运行状态有5种,分别为

2.2 线程池执行流程

首先,所有任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。其执行过程如下:

1.首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
2.如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
3.如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
4.如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
5.如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

2.3 线程池核心参数

ThreadPoolExecutor线程池核心参数

corePoolSize:线程池的核心线程数,线程池初始化创建的最小线程数量。
maximumPoolSize:最大线程数,当阻塞队列满了以后,还可以创建线程的最大数量。
keepAliveTime:空闲线程存活时间,核心线程之外创建的线程没有任务的时候不是立即销毁,超过等待时间之后才会被回收销毁。
unit:存活的时间单位,keepAliveTime的时间单位。
workQueue:阻塞队列,存放提交但未执行任务的队列。
threadFactory:创建线程的工厂类,用来创建线程执行器。
handler:饱和策略,当前线程超过线程池最大线程数后处理策略。

2.4 线程池饱和策略

AbortPolicy: 默认策略,抛出异常RejectedExecutionException,拒绝执行 。

CallerRunsPolicy: 调用执行自己的线程运行任务,也就是直接在调用execute方法的线程中运行(run)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且要求任何一个任务请求都要被执行的话,你可以选择这个策略。

DiscardPolicy: 不处理新任务,直接丢弃掉。
DiscardOldestPolicy: 此策略将丢弃最早的未处理的任务请求。

Spring 通过 ThreadPoolTaskExecutor 或者我们直接通过 ThreadPoolExecutor 的构造函数创建线程池的时候,当我们不指定 RejectedExecutionHandler 饱和策略的话来配置线程池的时候默认使用的是 ThreadPoolExecutor.AbortPolicy。在默认情况下,ThreadPoolExecutor 将抛出 RejectedExecutionException 来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。 对于可伸缩的应用程序,建议使用 ThreadPoolExecutor.CallerRunsPolicy。当最大池被填满时,此策略为我们提供可伸缩队列。

2.5 线程池阻塞队列

任务缓冲模块是线程池能够管理任务的核心部分。线程池的本质是对任务和线程的管理,而做到这一点最关键的思想就是将任务和线程两者解耦,不让两者直接关联,才可以做后续的分配工作。线程池中是以生产者消费者模式,通过一个阻塞队列来实现的。阻塞队列缓存任务,工作线程从阻塞队列中获取任务。

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

2.7 线程池关闭方式

shutdownNow():立即关闭线程池(暴力),正在执行中的及队列中的任务会被中断,同时该方法会返回被中断的队列中的任务列表。
shutdown():平滑关闭线程池,正在执行中的及队列中的任务能执行完成,后续进来的任务会被执行拒绝策略。
isTerminated():当正在执行的任务及对列中的任务全部都执行(清空)完就会返回true。

2.8 线程池数量配置

线程池配置数量需要根据实际情况决定,比如业务类型,机器的IO,CPU,内存配置等,可以参考业界做法,但是要因地制宜。比如netty默认就是:实际 cpu核数 * 2。

NettyRuntime.availableProcessors() * 2

1.如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1。
2.如果是IO密集型任务,参考值可以设置为2*NCPU。

3. 线程池工具类

Executors 是一个 Java 中的工具类。提供四种线程池创建方式,工厂方法来创建不同类型的线程池。Executors 的创建线程池的方法,创建出来的线程池都实现了ExecutorService 接口。常用方法有以下几个:

1.newFiexedThreadPool(int Threads):创建固定数目线程的线程池。
2.newCachedThreadPool():创建一个可缓存的线程池,调用 execute将重用以前构造的线程(如果线程可用)。如果没有可用的线程,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。
3.newSingleThreadExecutor() 创建一个单线程化的 Executor。
4.newScheduledThreadPool(int corePoolSize) 创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代 Timer 类。

4. Executors创建线程池注意事项

不建议大家使用Executors这个类来创建线程池呢,阿里开发手册这样定义:
【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。说明: Executors 返回的线程池对象的弊端如下:
1) FixedThreadPool 和 SingleThreadPool:
允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
2) CachedThreadPool 和 ScheduledThreadPool:
允许的创建线程数量为 Integer.MAX_VALUE, 可能会创建大量的线程,从而导致 OOM。

项目中创建多线程时,使用常见的三种线程池创建方式,单一、可变、定长都有一定问题,原因是 FixedThreadPool 和 SingleThreadExecutor 底层都是用LinkedBlockingQueue 实现的,这个队列最大长度为 Integer.MAX_VALUE,容易导致 OOM。
所以实际生产一般自己通过 ThreadPoolExecutor 的 7 个参数,自定义线程池。

5. 线程池手动创建

package com.zrj.unit.juc;import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.junit.Test;import java.util.concurrent.*;/*** 线程池* 注意:ThreadFactoryBuilder这里需要运用guava包,自定义线程名称** @author zrj* @since 2021/8/20**/
public class ThreadPoolExecutorTest {//线程池的核心线程数private static int corePoolSize = 30;//能容纳的最大线程数private static int maximumPoolSize = 200;//空闲线程存活时间private static long keepAliveTime = 0L;//空闲线程存活时间 单位private static TimeUnit unit = TimeUnit.MILLISECONDS;//创建线程的工厂类,自定义线程名称private static String threadName = "thread-local-pool-%d";private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat(threadName).build();//存放提交但未执行任务的队列private static BlockingQueue<Runnable> threadFactory = new LinkedBlockingQueue<>(1024);//等待队列满后的拒绝策略private static RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();//定义线程池private static ExecutorService pool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, namedThreadFactory, handler);// 定义售票窗口数private static int saleWindows = 10;// 自定义线程名称private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("thread-%d").build();// 倒计时锁存器private static CountDownLatch latch = new CountDownLatch(saleWindows);/*** 模拟抢票*/@Testpublic void saleTicket() {try {for (int i = 0; i < saleWindows; i++) {pool.execute(() -> {try {String threadName = Thread.currentThread().getName();System.out.println(threadName + "窗口开始售票");Thread.sleep(1000);System.out.println(threadName + "窗口售票完成");} catch (InterruptedException e) {System.out.println("售票异常:" + e);} finally {latch.countDown();}});}} catch (Exception e) {System.out.println("系统异常:" + e);} finally {pool.shutdown();}// 等待所有线程到达放行try {latch.await();} catch (Exception e) {System.out.println("系统异常," + e);}System.out.println("所有线程执行完成,继续执行主线程");}
}

6. SpringBoot创建线程池

springboot创建线程池,Spring提供的对ThreadPoolExecutor封装的线程池ThreadPoolTaskExecutor,直接使用注解启用。
Async相当于是方法级别的线程,本身没有自定义线程池更加灵活
相当于是每进来一个请求就开启一个线程,超过核心线程数小于最大线程数放入队列,队列满了,继续创建线程直至达到最大线程数。

application.properties

# 异步线程配置
# 配置核心线程数
async.executor.thread.core_pool_size = 5
# 配置最大线程数
async.executor.thread.max_pool_size = 5
# 配置队列大小
async.executor.thread.queue_capacity = 99999
# 配置线程池中的线程的名称前缀
async.executor.thread.name.prefix = async-service-

ExecutorConfig

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;/*** 线程池的配置** @author zrj* @since 2021/9/9**/
@Slf4j
@EnableAsync
@Configuration
public class ExecutorConfig {@Value("${async.executor.thread.core_pool_size}")private int corePoolSize;@Value("${async.executor.thread.max_pool_size}")private int maxPoolSize;@Value("${async.executor.thread.queue_capacity}")private int queueCapacity;@Value("${async.executor.thread.name.prefix}")private String namePrefix;@Bean(name = "asyncServiceExecutor")public Executor asyncServiceExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();//配置核心线程数executor.setCorePoolSize(corePoolSize);//配置最大线程数executor.setMaxPoolSize(maxPoolSize);//配置队列大小executor.setQueueCapacity(queueCapacity);//配置线程池中的线程的名称前缀executor.setThreadNamePrefix(namePrefix);// rejection-policy:当pool已经达到max size的时候,如何处理新任务// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//执行初始化executor.initialize();return executor;}
}

AsyncService

/*** @author zrj* @since 2021/9/9**/
public interface AsyncService {void executeAsync();
}

AsyncServiceImpl

package com.zrj.unit.service.impl;import com.zrj.unit.service.AsyncService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;/*** 线程服务* Async相当于是方法级别的线程,本身没有自定义线程池更加灵活* 相当于是每进来一个请求就开启一个线程,超过核心线程数小于最大线程数放入队列,* 队列满了,继续创建线程直至达到最大线程数** @author zrj* @since 2021/9/9**/
@Slf4j
@Service
public class AsyncServiceImpl implements AsyncService {@Override@Async("asyncServiceExecutor")public void executeAsync() {log.info( "start executeAsync" );try {System.out.println( "异步线程要做的事情" );System.out.println( "可以在这里执行批量插入等耗时的事情" );Thread.sleep( 1000 );} catch (InterruptedException e) {e.printStackTrace();}log.info( "end executeAsync" );}
}

AsyncController

package com.zrj.unit.controller;import com.zrj.unit.service.AsyncService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;/*** 线程控制器** @author zrj* @since 2021/8/30**/
@RestController
@RequestMapping("/async")
public class AsyncController {@Resourceprivate AsyncService asyncService;/*** 多线程*/@GetMapping("/executor")public String executor() {//开30个线程for (int i = 0; i < 30; i++) {asyncService.executeAsync();}return "executor success";}
}

参考文档
Java线程池实现原理及其在美团业务中的实践
Java线程池原理及使用
Java线程池详解

Java线程池ThreadPool详解相关推荐

  1. java线程池使用详解ThreadPoolExecutor使用示例

    一 使用线程池的好处 二 Executor 框架 2.1 简介 2.2 Executor 框架结构(主要由三大部分组成) 1) 任务(Runnable /Callable) 2) 任务的执行(Exec ...

  2. Java线程池(Executor)详解和用法

    背景 面试的时候经常会被三连问.用过吗?如何用的?场景是什么?所以有必要好好的研究下线程池迫在眉睫. 1.讲解之前先了解下 retry: 因为源码中有这个retry标记 先看一个简单的例子 /*** ...

  3. java线程池使用详解

    http://automaticthoughts.iteye.com/blog/1612388 一 简介 线程的使用在java中占有极其重要的地位,在jdk1.4极其之前的jdk版本中,关于线程池的使 ...

  4. Java线程池参数详解

    1:前言 在使用线程池时,为了获取最佳的性能,常常需要手动指定线程池的参数,ThreadPoolExecutor是最常用的线程池执行器,它有四个构造方法,参数最多的构造方法有7个参数,下面将详细介绍这 ...

  5. 线程池ThreadPool详解

    线程池介绍 可以复用线程池的每一个资源 控制资源的总量 为什么要使用线程池 问题一:反复创建线程开销大 问题二:过多的线程会占用太多内存 解决以上两个问题的思路 • 用少量的线程--避免内存占用过多 ...

  6. async spring 默认线程池_Spring boot注解@Async线程池实例详解

    这篇文章主要介绍了Spring boot注解@Async线程池实例详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 从Spring3开始提供了@A ...

  7. 线程池源代码详解,参数详解

    线程池源代码详解,参数详解 ThreadPoolExecutor 构造函数源代码 public ThreadPoolExecutor(int corePoolSize, int maximumPool ...

  8. python线程池原理_Python定时器线程池原理详解

    这篇文章主要介绍了Python定时器线程池原理详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 定时器执行循环任务: 知识储备 Timer(int ...

  9. java线程池详解及五种线程池方法详解

    基础知识 Executors创建线程池 Java中创建线程池很简单,只需要调用Executors中相应的便捷方法即可,比如Executors.newFixedThreadPool(int nThrea ...

最新文章

  1. 【ACM】杭电OJ 2015
  2. htaccess分布式配置文件常用写法
  3. Flink从入门到精通100篇(三)-如何利用InfluxDB+Grafana搭建Flink on YARN作业监控大屏环境
  4. java文件头_对java文件头的解析
  5. mysql命令导入存储过程报错_mysql导入存储过程时declare报错的有关问题解决
  6. JAVA 入门(一)
  7. MySQL使用用户变量需确定取值的顺序
  8. 构建负载均衡服务器之一 负载均衡与集群详解
  9. WEB安全基础-SQL注入基础
  10. c语言最短延时程序,linux下写个C语言程序,要求有0.5微秒以下的延时,要怎样写...
  11. php在线白板,C#实现网络电子白板、课件功能 (在线教学系统)
  12. Html之图片轮播(锚)
  13. mysql数据库第五章_第五章 数据库的查询
  14. 微信公众号开发工具包
  15. 顺序表的基本操作实现
  16. Phyton Socket发送接收Modbus数组
  17. gls开发_广义最小二乘gls数学推导直觉
  18. 安卓设备的Socket网络通讯例程 (A2) -UI界面适配
  19. linux 家目录没有ssh文件夹,ssh – 如何远程列出已配置用户主目录中的目录
  20. 基于高德地图实现可编辑的电子围栏功能【多边形围栏】

热门文章

  1. 深圳的发展历程[图片]
  2. 大化设计模式学习笔记(简单工厂模式)
  3. 东北大学考研计算机好考么,东北大学计算机考研经验分享
  4. 服务器共享文件怎么自动备份,如何用批处理把域的文件服务器中的共享资源备份到另一台电脑中...
  5. 东京感受4-异国发现
  6. 杨冬敏老师:老板懂点财务知识,看懂三张报表
  7. 全新OCR3500数据
  8. 直流电机恒转速闭环调节控制系统(项目实战)
  9. RabbitMQ介绍以及五种工作模式
  10. Android Studio——android中handler用法总结