【JUC】第五章 JUC 阻塞队列、线程池
第五章 JUC 阻塞队列、线程池
文章目录
- 第五章 JUC 阻塞队列、线程池
- 一、阻塞队列
- 1.简介
- 2.BlockingQueue 的方法
- 3.常见的 BlockingQueue
- 二、线程池
- 1.简介
- 2.线程池参数
- 3.线程池的种类
- 4.自定义线程池
- 4.线程池底层工作原理
一、阻塞队列
1.简介
Concurrent 包中,BlockingQueue 很好地解决了多线程中,如何高效安全 “传输” 数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利
阻塞队列,顾名思义,首先它是一个队列,通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出
- 当队列是空的,从队列中获取元素的操作将会被阻塞,直到其它线程向空的队列插入新的元素
- 当队列是满的,向队列中添加元素的操作将会被阻塞,直到其它线程从队列中移除一个或多个元素
使用阻塞队列的好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切阻塞队列都帮我们做好了
在 concurrent 包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度
在多线程环境中,通过队列可以很容易实现数据共享,比如经典的 “生产者” 和 “消费者” 模型中,通过队列可以很便利地实现两者之间的数据共享。假设我们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的数据共享问题。但生产者和消费者在某个时间段内,可能发生数据处理速度不匹配的情况,如果生产者产出数据的速度大于消费者消费的速度,并且当生产出来的数据累积到一定程度的时候,那么生产者必须暂停等待,以便等待消费者线程把累积的数据处理完毕
2.BlockingQueue 的方法
- boolean add(E e)
将指定的元素插入到此队列中,如果可以立即执行此操作而不违反容量限制,在成功后返回 true,如果当前没有可用空间,则抛出 IllegalStateException - boolean contains(Object o)
如果此队列包含指定的元素,则返回 true - int drainTo(Collection<? super E> c)
从该队列中删除所有可用的元素,并将它们添加到给定的集合中 - int drainTo(Collection<? super E> c, int maxElements)
最多从该队列中删除给定数量的可用元素,并将它们添加到给定的集合中 - boolean offer(E e)
将指定的元素插入到此队列中,如果可以立即执行此操作而不会违反荣狼限制,在成功时返回 true,如果当前没有可用空间,则返回 false - E poll(long timeout, TimeUnit unit)
检索并删除此队列的头,等待指定的等待时间(如有必要)使元素变为可用 - void put(E e)
将指定的元素插入到此队列中,等待空格可用 - int remainingCapcity()
返回该队列剩余容量 - boolean remove(Object o)
从该队列中删除指定元素的单个实例 - E take()
检索并删除此队列的头,如有必要,等待元素可用
3.常见的 BlockingQueue
ArrayBlockingQueue
- 由数组结构组成的有界阻塞队列
相对来说比较常用,是一个基于数组实现的阻塞队列。在 ArrayBlockingQueue 内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue 内部还保存着两个整型变量,分别标识着队列的头部和尾部在数组中的位置
ArrayBlockingQueue 在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于 LinkedBlockingQueue。按照实现原理来分析,ArrayBlockingQueue 完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。之所以没有这样去做,也许是因为 ArrayBlockingQueue 的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。ArrayBlockingQueue 和 LinkedBlockingQueue 间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的 Node 对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于 GC 的影响还是存在一定的区别。而在创建 ArrayBlockingQueue 时,我们还可以控制对象的内部锁是否采用公平锁(默认采用非公平锁)
LinkedBlockingQueue
- 由链表结构组成的有界阻塞队列,大小默认为 Integer.MAX_VALUE
相对来说较为常用,是一个基于链表实现的阻塞队列。和 ArrayListBlockingQueue 类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回。只有当队列缓冲区达到最大缓存容量时(可以通过构造函数指定)才会阻塞生产者队列,等到消费者从队列中消费掉一份数据再唤醒生产者,这是 LinkedBlockingQueue 之所以能够高效地处理并发数据的一个原因,还有一个原因是其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能
ArrayBlockingQueue 和 LinkedBlockingQueue 是两个最普通也是最常用的阻塞队列,一般情况下,在处理多线程间的生产者消费者问题,使用这两个类足矣
DelayQueue
- 使用优先级队列实现的延迟无界阻塞队列
DelayQueue 中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue 是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞
PriorityBlockingQueue
- 支持优先级排序的无界阻塞队列
基于优先级的阻塞队列(优先级的判断通过构造函数传入的 Compator 对象来决定),但需要注意的是 PriorityBlockingQueue 并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者
因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间
在实现 PriorityBlockingQueue 时,内部控制线程同步的锁采用的是公平锁
SynchronousQueue
- 不存储元素的阻塞队列,即单个元素的队列
一种无缓冲的等待队列,相对于有缓冲的阻塞队列来说,少了缓冲区。一方面,吞吐量不如有缓冲的阻塞队列;另一方面,对于单个任务响应更加快
声明一个 SynchronousQueue 有两种不同的方式
- 公平模式:采用公平锁,并配合一个 FIFO 队列来阻塞多余的生产者和消费者,从而体现整体的公平策略
- 非公平模式:SynchronousQueue 采用非公平锁,同时配合一个 LIFO 队列来管理多余的生产者和消费者
LinkedTransferQueue
- 由链表组成的无界阻塞队列
LinkedTransferQueue 是一个由链表结构组成的无界阻塞 TransferQueue 队列。相对于其它阻塞队列,LinkedTransferQueue 多了 tryTransfer 和 transfer 方法
LinkedTransferQueue 采用一种预占模式,意思就是消费者线程取元素时,如果队列不为空,则直接取走数据,若队列为空,那就生成一个结点(节点元素为 null)入队,消费者线程会在这个结点等待,后面生产者线程入队时发现有一个元素为 null 的结点,生产者线程就不入队了,直接将元素填充到该结点,并唤醒该结点等待的线程,被唤醒的消费者线程取走元素
LinkedBlockingDeque
- 由链表组成的双向阻塞队列
LinkedBlockingDequeue 是一个由链表结构组成的双向阻塞队列,即可以从队列的两队插入和移除元素
对于一些指定的操作,在插入或者获取队列元素时如果队列状态不允许该操作,可能会阻塞住该线程直到队列状态变更为允许操作,这里的阻塞一般有两种情况
- 插入元素时:如果当前队列已满,将会进入阻塞状态,一直等到队列有空的位置时再将该元素插入,该操作可以通过设置超时参数,超时后返回 false 表示操作失败,也可以不设置超时参数一直阻塞,中断后抛出 InterruptedException 异常
- 读取元素时:如果当前队列为空,会发生阻塞,直到队列不为空然后返回元素,同样可以设置超时参数
二、线程池
1.简介
线程池(Thread Pool)是一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性能和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务,这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度
线程池做的工作就是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行
特点:
- 降低资源能耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗
- 提高响应速度:当任务到达时,任务可以不需要等待线程创建就能立即执行
- 提高线程的可管理性:线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性, 使用线程池可以进行统一的分配,调优和监控
- Java 中的线程池是通过 Executor 框架实现的,该框架中用到了 Executor、Executors、ExecutorServic、ThreadPoolExecutor 这几个类
2.线程池参数
- corePoolSize 线程池的核心线程数
- maximumPoolSize 线程池能容纳的最大线程数
- keepAliveTime 空闲线程的存活时间
- unit 存活的时间单位
- workQueue 存放提交但未执行任务的队列
- threadFactory 创建线程的工厂类
- handler 等待队列满后的拒绝策略
拒绝策略
- AbortPolicy
ThreadPoolExecutor 中默认的拒绝策略就是 AbortPolicy,直接抛出 RejectedExecutionException 异常 - CallerRunsPolicy
CallerRunsPolicy 在任务被拒绝添加后,会调用当前线程池的所在的线程去执行被拒绝的任务,缺点是可能会阻塞主线程 - DiscardPolicy
采用这个拒绝策略会让线程池拒绝的任务直接抛弃,不会抛异常也不会执行 - DiscardOldestPolicy
DiscardOldestPolicy 策略的作用是,当任务被拒绝添加时,会抛弃任务队列中最旧的任务,也就是最先加入队列的任务,再把这个新任务添加进去
3.线程池的种类
newCachedThreadPool
创建可供缓存的线程池,该线程池中的线程空闲时间超过 60s 会自动销毁
特征:
- 线程池中数量没有固定,可达最大值(Integer.MAX_VALUE)
- 线程池中的线程可进行缓存重复利用和回收(回收默认时间为 1 分钟)
- 当线程池中没有可用的线程时,会重新创建一个线程
public static ExecutorService newCachedThreadPool(){/*** corePoolSize * maximumPoolSize * keepAliveTime * unit * workQueue * threadFactory * handler */return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.SECONDS,new SynchronousQueue<>(),Executors.defaultThreadFactory().new ThreadPoolExecutor.AbortPolicy());
}
适用于创建一个可无限扩大的线程池,服务器负载压力较轻,执行时间短、任务多的场景
newFixedThreadPool
创建固定线程数的线程池
特征:
- 线程池中的线程维持一定的数量,可以很好地控制线程的并发量
- 线程可以重复被使用,在显示关闭之前,都将一直存在
- 超出一定量的线程被提交时需在队列中等待
public static ExecutorService newFixedThreadPool(){/*** corePoolSize * maximumPoolSize * keepAliveTime * unit * workQueue * threadFactory * handler */return new ThreadPoolExecutor(10,10,0L,TimeUnit.SECONDS,new LinkedBlockQueue<>(),Executors.defaultThreadFactory().new ThreadPoolExecutor.AbortPolicy());
}
适用于可以预测线程数量的业务中,或者服务器负载较重,对线程数有严格限制的场景
newSingleThreadExecutor
创建只有一个线程的线程池
特征:
- 线程池中最多执行 1 个线程,之后提交的线程活动将会排在队列中以此执行
public static ExecutorService newSingleThreadExecutor(){/*** corePoolSize * maximumPoolSize * keepAliveTime * unit * workQueue * threadFactory * handler */return new ThreadPoolExecutor(1,1,0L,TimeUnit.SECONDS,new LinkedBlockQueue<>(),Executors.defaultThreadFactory().new ThreadPoolExecutor.AbortPolicy());
}
适用于需要保证顺序执行各个任务,并且在任意时间点,不会同时有多个线程的场景
newScheduleThreadPool
创建可供调度使用的线程池(可延时启动,定时启动)
特征:
- 线程池中具有指定数量的线程,即便是空线程也将保留
- 可定时或者延迟执行线程活动
public static ExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory){return new ThreadPoolExecutor(corePoolSize,threadFactory,);
}
适用于需要多个后台线程执行周期任务的场景
newWorkStealingPool
JDK 1.8 提供的线程池,底层使用 ForkJoinPool 实现,创建一个拥有多个任务队列的线程池,可以减少连接数,创建当前可用 CPU 核心数的线程来并行执行任务
public static ExecutorService newWorkStealingPool(int parallelism){/*** parallelism:并行级别,通常默认为 JVM 可用的处理器个数* factory:用于创建 ForkJoinPool 中使用的线程* handler:用于处理工作线程未处理的异常,默认为 null* asyncMode:用于控制 WorkQueue 的工作模式:队列--反队列*/return new ForkJoinPool(parallelism,ForkJoinPool.defaultForkJoinWorkerThreadFactory,null,true);
}
适用于大耗时,可并行执行的场景
4.自定义线程池
package com.sisyphus.pool;import java.util.concurrent.*;/*** @Description: $* @Param: $* @return: $* @Author: Sisyphus* @Date: 8/20*/
public class ThreadPoolDemo {public static void main(String[] args) {ExecutorService threadPool = new ThreadPoolExecutor(2,5,2L,TimeUnit.SECONDS,new ArrayBlockingQueue<>(3),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());try {//10 个顾客请求for (int i = 1; i <= 10; i++){//执行threadPool.execute(()->{System.out.println(Thread.currentThread().getName() + "办理业务");});}} catch (Exception e) {e.printStackTrace();} finally {//关闭threadPool.shutdown();}}
}
4.线程池底层工作原理
- 在创建了线程池后,线程池中的线程数为零
- 当调用了 execute() 方法添加一个请求任务时,线程池会做出如下判断:
- 如果正在运行的线程数量小于 corePoolSize,那么马上创建核心线程运行这个任务
- 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列
- 如果这个时候队列满了且正在运行的线程数量还小于 maximumPoolSize,那么创建非核心线程立刻运行这个任务
- 如果队列满了且正在运行的线程数量大于或等于 maximumPooleSize,那么线程会启动饱和拒绝策略来执行
- 当一个线程完成任务时,它会从队列中取下一个任务来执行
- 当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断当前运行的线程数是否大于 corePoolSize,如果大于则会停掉这个线程
- 所有线程的所有任务完成后,线程池的线程数最终会收缩到 corePoolSize 大小
【JUC】第五章 JUC 阻塞队列、线程池相关推荐
- 【Java 并发编程】线程池机制 ( 线程池阻塞队列 | 线程池拒绝策略 | 使用 ThreadPoolExecutor 自定义线程池参数 )
文章目录 一.线程池阻塞队列 二.拒绝策略 三.使用 ThreadPoolExecutor 自定义线程池参数 一.线程池阻塞队列 线程池阻塞队列是线程池创建的第 555 个参数 : BlockingQ ...
- 浅谈Java锁,与JUC的常用类,集合安全类,常用辅助类,读写锁,阻塞队列,线程池,ForkJoin,volatile,单例模式不安全,CAS,各种锁
浅谈JUC的常用类 JUC就是java.util.concurrent-包下的类 回顾多线程 Java默认有几个线程? 2 个 mian.GC Java 真的可以开启线程吗? 开不了,点击源码得知:本 ...
- 【JUC并发编程10】阻塞队列
文章目录 10 阻塞队列 10.1 阻塞队列概述 10.2 阻塞队列架构 10.3 阻塞队列分类 10.4 阻塞队列核心方法 10 阻塞队列 10.1 阻塞队列概述 阻塞队列是共享队列(多线程操作), ...
- 【JUC】第一章 JUC概述、Lock 接口
第一章 JUC 概述.Lock 接口 文章目录 第一章 JUC 概述.Lock 接口 一.JUC 概述 1.什么是 JUC 2.线程和进程概念 3.线程的状态 4.并发与并行 5.管程 6.用户线程和 ...
- Python 线程----线程方法,线程事件,线程队列,线程池,GIL锁,协程,Greenlet
主要内容: 线程的一些其他方法 线程事件 线程队列 线程池 GIL锁 协程 Greenlet Gevent 一. 线程(threading)的一些其他方法 from threading import ...
- 网络编程9_线程-条件,定时器,队列,线程池, 协程
线程 一. 条件 使得线程等待,只有满足某条件时,才释放n个线程 import time from threading import Thread,RLock,Conditio ...
- 线程池的五种状态及创建线程池的几种方式
上篇<Java线程的6种状态详解及创建线程的4种方式> 前言:我们都知道,线程是稀有资源,系统频繁创建会很大程度上影响服务器的使用效率,如果不加以限制,很容易就会把服务器资源耗尽.所以,我 ...
- 线程队列 线程池 协程
1 . 线程队列 from multiprocessing Queue , JoinableQueue #进程IPC队列 from queue import Queue #线程队列 先进先出 f ...
- 线程队列,线程池和协程
线程的其他方法: threading.current_thread() #当前线程对象 getName() # 获取线程名 ident # 获取线程id threading.enumerate ...
最新文章
- 活动|跟着微软一起,拥抱开源吧!
- 家庭财务管理系统c语言论文,《家庭财务管理系统》-毕业论文.doc
- mysql如果存在则删除数据库_怎么判断sql数据库是否存在,存在删除
- Docker网络相关
- 【APIO2018】Duathlon 铁人两项 【圆方树】
- MyBatis由浅入深学习总结之二:MyBatis解决Java实体类和数据库表字段不一致方法总结
- InputService
- Oracle使用sqluldr2
- 双眼融合训练一个月_双眼视觉是什么?为什么要进行视功能训练?
- 北京市三级医院电话预约挂号一览表
- 华硕服务器系统安装系统安装教程视频,华硕的系统安装教程 华硕u盘安装系统教程...
- 叉乘应用:判断三角形方向正反/三个点顺时针逆时针
- vbox虚拟机网络设置
- 30天自制操作系统——自写设计
- 基于CNN的人脸 性别、年龄识别
- 使用certbot完成证书的自动发放
- 申请清华大学计算机类的理由,青年人选择清华的七个理由
- 2021-2025年中国军用浮桥行业市场供需与战略研究报告
- mac笔记——from“http://macshuo.com/”
- 【Python】Flask学习笔记
热门文章
- python 发送邮件不显示附件_求助:写python脚本发 带有附件的邮件, 收到邮件后,发现附件直接显示在屏幕上了,而不是以附件形式...
- bz2解压命令_Java压缩技术 - tar.bz2解压缩
- acm之java输入输出_ACM中Java输入输出
- mui-scroll-wrapper mui-scroll 内容增多不出滚动条
- Spring Boot 针对 Java 开发人员的安装指南
- ubuntu 虚拟机上的 django 服务,在外部Windows系统上无法访问
- oracle连接总结(内连接、外连接、自然连接,交叉连接,自连接)
- 配置Hibernate二级缓存步骤
- C++持有Object-C对象时容易内存泄露
- 在WinForm中使用Web Service来实现软件自动升级