JAVA并发编程实战-任务执行
目录
- 思维导图
- 1 在线程中执行任务
- 1.1 顺序执行任务
- 1.2 显式的为任务创建线程
- 1.3 无限制创建线程的缺点
- 2 Executor框架
- 2.1 使用Executor实现WebServer
- 2.2 执行策略
- 2.3 线程池
- 2.3.1 定长线程池-newFixedThreadPool
- 2.3.2 可缓存线程池-newCachedThreadPool
- 2.3.3 单线程化的newSingleThreadExecutor
- 2.3.4 定时的线程池-newScheduledThreadPool
- 2.4 Executor生命周期
- 2.5 延迟的、周期性的任务
- 3 寻找可强化的并发性-以页面渲染器为例
- 3.1 顺序执行的页面渲染器
- 3.2 使用Future实现页面渲染器
- 3.3 使用CompletionService进行页面渲染
- 参考文献
思维导图
1 在线程中执行任务
多数并发程序是围绕任务进行管理的,所谓任务就是抽象、离散的工作单元。
正常情况下,服务器应该兼顾良好的吞吐量和快速的响应性。在负荷状况下,应该平缓的劣化,不应该快速失败,为了达到这些策略,应该有一个明确的任务执行策略。
1.1 顺序执行任务
应用程序内部有多种任务调度策略,其中简单的策略是:
单一的线程中顺序的执行任务。
如下示例demo:
public class OrderExecutionExample {public static void main(String[] args) {try {ServerSocket serverSocket = new ServerSocket(8000);//顺序执行,接受连接,处理连接while (true) {Socket accept = serverSocket.accept();handlerSocket(accept);System.out.println(Thread.activeCount());}} catch (IOException e) {e.printStackTrace();}}private static void handlerSocket(Socket accept) {//处理acceptSystem.out.println("accept");}
}
顺序的接受请求,处理请求,均在主线程进行处理。
顺序处理并发极低,必须等待一个请求结束才能响应下一个请求。
1.2 显式的为任务创建线程
为了更好提供服务,可以为每个请求创建一个线程,如下demo:
public class ConcurrentExecutionExample {public static void main(String[] args) {try {ServerSocket socket = new ServerSocket(9000);//并发处理请求while (true) {Socket accept = socket.accept();new Thread(() -> {handlerAccept(accept);}).start();}} catch (IOException e) {e.printStackTrace();}}private static void handlerAccept(Socket accept) {System.out.println("处理连接");}
}
这种情况主线程负责处理接受请求,子线程负责任务处理。意味着下面三个结论:
1. 执行任务的负载已经脱离了主线程。
2. 并行处理任务,使得可以多个请求同时得到服务。
3. 任务处理代码需要线程安全,因为每个处理线程都会调用它。
1.3 无限制创建线程的缺点
类似1.2中每个请求一个线程,会导致创建大量线程
实际中创建大量线程会有各种问题:
- 线程生命周期的开销。创建和关闭线程都需要借助操作系统,花费大量时间。
- 资源销毁高。线程会占用系统资源,主要是内存。
- 稳定性问题。应该限制创建线程个数。
通常来说,在一定范围增加创建线程,可以提高系统的吞吐量,一旦超出范围,创建更多线程可能占用过多资源,导致程序出现各种问题。
2 Executor框架
任务是逻辑上的工作单元,线程是任务异步执行的机制。
Executor为任务提交和任务执行提供了解耦的标准方法。
2.1 使用Executor实现WebServer
替换显示的创建线程,交由Executor去管理线程和任务提交,示例demo如下:
public class TaskExecutorExample {private static final int NTHREAD = 10;private static final Executor EXECUTOR = new ThreadPoolExecutor(NTHREAD, NTHREAD, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(100));public static void main(String[] args) {try {ServerSocket socket = new ServerSocket(8000);while (true) {Socket accept = socket.accept();EXECUTOR.execute(() -> {handlerAccept(accept);});}} catch (IOException e) {e.printStackTrace();}}private static void handlerAccept(Socket accept) {System.out.println("处理 accept");}
}
通过Executor我们将任务提交和执行进行了解耦,代替了硬编码的创建线程。
2.2 执行策略
一个执行策略明确了需要在任务执行过程关注的点:
- 任务在什么线程执行?
- 任务以什么方式执行?
- 可以有多少个任务并发执行?
- 可以有多少任务进入等待队列?
- 如果任务过载,需要放弃任务,怎么办?
- 一个任务执行前后,应该做什么?
执行策略是对资源管理的工具,最佳策略取决于可用的计算资源和你对服务质量的要求。
2.3 线程池
线程池管理工作者线程,帮助我们管理工作线程的创建和销毁,工作队列的处理,可用让我们更加关注任务的编写上。
类库Executors提供了我们多种创建线程池的静态方法。
2.3.1 定长线程池-newFixedThreadPool
创建源码:
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}
每提交一个任务创建一个线程,直到达到最大固定线程数。
使用的工作队列是new LinkedBlockingQueue()也就是工作队列是无限的,最好设置固定大小。
2.3.2 可缓存线程池-newCachedThreadPool
创建源码:
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}
可以灵活创建线程,无大小限制,当线程过多会进行回收,默认60s未使用进行销毁。
2.3.3 单线程化的newSingleThreadExecutor
创建源码:
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}
使用唯一的工作线程处理任务,如果该线程异常结束,将会重新创建一个新的线程。
2.3.4 定时的线程池-newScheduledThreadPool
创建源码:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);}
public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());}
可见使用了延迟工作队列,支持定时和周期性的执行任务。
2.4 Executor生命周期
Executor本身并没有解决生命周期问题,它的子接口ExecutorService提供了一些接口用于解决这个问题:
上述方法揭示了生命周期有三种类型:运行、关闭和终止。
shutdown:停止接受新的任务,会等待正在执行和队列中已经提交的任务完成。
shutdownNow:强制关闭,取消执行中和队列中的任务,返回队列中未执行的任务。
支持关闭操作的WebServer示例:
public class LifeCycleTaskExecutor {private int NTHREAD = 10;private ExecutorService EXECUTOR = new ThreadPoolExecutor(NTHREAD, NTHREAD, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(100));public static void main(String[] args) {LifeCycleTaskExecutor lifeCycleTaskExecutor = new LifeCycleTaskExecutor();try {ServerSocket socket = new ServerSocket(8000);while (!lifeCycleTaskExecutor.EXECUTOR.isShutdown()) {Socket accept = socket.accept();lifeCycleTaskExecutor.EXECUTOR.execute(() -> {lifeCycleTaskExecutor.handlerAccept(accept);});}} catch (IOException e ) {e.printStackTrace();} catch (RejectedExecutionException e) {if (!lifeCycleTaskExecutor.EXECUTOR.isShutdown()) {System.out.println("任务提交失败");}}}private void handlerAccept(Socket accept) {try {InputStream inputStream = accept.getInputStream();if (getRequest(inputStream) == "CLOSE") {//关闭连接请求EXECUTOR.shutdown();} else {System.out.println("处理请求");}} catch (IOException e) {e.printStackTrace();}}private String getRequest(InputStream inputStream) {return null;}private void stopExecutor() {EXECUTOR.shutdown();}
}
利用ExecutorService提供的生命周期管理方法进行处理。
2.5 延迟的、周期性的任务
之前我们使用Timer来执行延迟或者周期任务,但是Timer单线程执行所有任务,很耗时,同时Timer对未受检的异常,没有进行捕获,导致timer线程终止,这个问题叫做"线程泄露"。
如下示例demo:
public class ScheduleTimerExample {public static void main(String[] args) {Timer timer = new Timer();timer.schedule(new TimerTaskExample(), 1);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}timer.schedule(new TimerTaskExample(), 1);try {Thread.sleep(6000);} catch (InterruptedException e) {e.printStackTrace();}}static class TimerTaskExample extends TimerTask {@Overridepublic void run() {throw new RuntimeException();}}
}
运行结果:
可见在执行任务时如果抛出未捕获异常,将导致线程关闭,再次执行任务将会出现问题。
建议:对于延迟、周期任务应该考虑使用ScheduledThreadPoolExecutor
3 寻找可强化的并发性-以页面渲染器为例
页面渲染器分为两块:
- 文字渲染。
- 图像下载和渲染。
3.1 顺序执行的页面渲染器
顺序按照以下步骤执行:
- 渲染文字。
- 下载图片。
- 渲染图片。
示例demo:
public class RenderPageExample {private ExecutorService executorService;public RenderPageExample(ExecutorService executorService) {this.executorService = executorService;}public static void main(String[] args) {RenderPageExample renderPageExample = new RenderPageExample(Executors.newFixedThreadPool(4));TextAndImagesSource textAndImagesSource = new TextAndImagesSource();textAndImagesSource.setText(new String[]{"123", "233", "333", "333", "333"});Image[] images = new Image[]{new Image() , new Image(), new Image() , new Image()};textAndImagesSource.setImages(images);renderPageExample.orderRenderPage(textAndImagesSource);}/*** 顺序处理渲染过程*/public void orderRenderPage(TextAndImagesSource textAndImagesSource) {//渲染文字renderText(textAndImagesSource.getText());List<Image> imageList = new ArrayList<>();for (Image image: textAndImagesSource.getImages()) {imageList.add(downloadImage(image));}for (Image image : imageList) {renderImage(image);}}private void renderText(String[] str) {try {Thread.sleep(200);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("渲染文字");}private Image downloadImage(Image image) {try {Thread.sleep(2000);System.out.println("下载图片");} catch (InterruptedException e) {e.printStackTrace();}return new Image();}private void renderImage(Image image) {System.out.println("渲染图片");}@Data@AllArgsConstructor@NoArgsConstructor@Builderstatic class TextAndImagesSource {private String[] text;private Image[] images;}
}
运行结果:
但是此种方式导致未充分利用资源,比如下载图像依赖IO,此时线程阻塞,cpu无事可做。
3.2 使用Future实现页面渲染器
为了实现更高并发性,我们可以将渲染分为两个任务:
- 渲染文本。
- 下载所有图像。
示例代码demo如下:
public class RenderPageExample {private ExecutorService executorService;public RenderPageExample(ExecutorService executorService) {this.executorService = executorService;}public static void main(String[] args) {RenderPageExample renderPageExample = new RenderPageExample(Executors.newFixedThreadPool(4));TextAndImagesSource textAndImagesSource = new TextAndImagesSource();textAndImagesSource.setText(new String[]{"123", "233", "333", "333", "333"});Image[] images = new Image[]{new Image() , new Image(), new Image() , new Image()};textAndImagesSource.setImages(images);renderPageExample.concurrentRederPage(textAndImagesSource);}/*** 并发处理文字渲染和图像下载* @param textAndImagesSource*/public void concurrentRederPage(TextAndImagesSource textAndImagesSource) {Future<List<Image>> future = executorService.submit(() -> {List<Image> imageList = new ArrayList<>();for (Image image : textAndImagesSource.getImages()) {imageList.add(downloadImage(image));}return imageList;});renderText(textAndImagesSource.getText());try {List<Image> imageList = future.get();for (Image image : imageList) {renderImage(image);}} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();} finally {executorService.shutdown();}}private void renderText(String[] str) {try {Thread.sleep(200);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("渲染文字");}private Image downloadImage(Image image) {try {Thread.sleep(2000);System.out.println("下载图片");} catch (InterruptedException e) {e.printStackTrace();}return new Image();}private void renderImage(Image image) {System.out.println("渲染图片");}@Data@AllArgsConstructor@NoArgsConstructor@Builderstatic class TextAndImagesSource {private String[] text;private Image[] images;}
}
处理过程为:
- 通过利用ExecutorService提交一个可获取结果的任务,也就是子线程下载所有图像。
- 之后主线程执行渲染文字。
- 通过Future的get获取下载结果进行渲染。
执行结果如下:
但是这个并发情况下,需要下载完所有图像,用户才可以看到渲染的图像。
3.3 使用CompletionService进行页面渲染
我们可以考虑使用CompletionService
,每需要下载一个图像,就开启一个独立任务执行,并在主线程执行获取结果,这样每次下载好一个图像,就可以进行渲染。
CompletionService
主要是重写了FutureTask
的done
方法,这样就可以在执行完成后保存结果到CompletionService
的阻塞队列completionQueue
,这样就可以异步获取结果了。
示例demo:
public class RenderPageExample {private ExecutorService executorService;public RenderPageExample(ExecutorService executorService) {this.executorService = executorService;}public static void main(String[] args) {RenderPageExample renderPageExample = new RenderPageExample(Executors.newFixedThreadPool(4));TextAndImagesSource textAndImagesSource = new TextAndImagesSource();textAndImagesSource.setText(new String[]{"123", "233", "333", "333", "333"});Image[] images = new Image[]{new Image() , new Image(), new Image() , new Image()};textAndImagesSource.setImages(images);renderPageExample.completionServiceRenderPage(textAndImagesSource);}/*** 使用CompletionService完成下载一个图像,渲染一个图像* @param textAndImagesSource*/public void completionServiceRenderPage(TextAndImagesSource textAndImagesSource) {Image[] images = textAndImagesSource.getImages();CompletionService<Image> completionService = new ExecutorCompletionService<>(executorService);for (Image image: images) {completionService.submit(() -> downloadImage(image));}renderText(textAndImagesSource.getText());try {for (int i = 0; i < images.length; i++) {renderImage(completionService.take().get());}} catch (ExecutionException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();} finally {executorService.shutdown();}}private void renderText(String[] str) {try {Thread.sleep(200);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("渲染文字");}private Image downloadImage(Image image) {try {int rand = new Random().nextInt(4000);Thread.sleep(rand);System.out.println("下载图片");} catch (InterruptedException e) {e.printStackTrace();}return new Image();}private void renderImage(Image image) {System.out.println("渲染图片");}@Data@AllArgsConstructor@NoArgsConstructor@Builderstatic class TextAndImagesSource {private String[] text;private Image[] images;}
}
处理过程为:
- 创建CompletionService,将ExecutorService线程池作为传入参数。
- 对每个图像,创建一个任务,由CompletionService提交任务。
- 主线程渲染文字。
- 主线程,获取下载结果,执行渲染,这里使用了阻塞队列。
执行结果:
可见此时是每次一有图像下载完,就会放入阻塞队列,这样主线程就可以获取结果进行渲染了。
参考文献
[1]. 《JAVA并发编程实战》.
JAVA并发编程实战-任务执行相关推荐
- Java并发编程实战————Executor框架与任务执行
引言 本篇博客介绍通过"执行任务"的机制来设计应用程序时需要掌握的一些知识.所有的内容均提炼自<Java并发编程实战>中第六章的内容. 大多数并发应用程序都是围绕&qu ...
- java 多线程缓存_[Java教程]【JAVA并发编程实战】12、使用condition实现多线程下的有界缓存先进先出队列...
[Java教程][JAVA并发编程实战]12.使用condition实现多线程下的有界缓存先进先出队列 0 2016-11-29 17:00:10 package cn.study.concurren ...
- Java并发编程实战————恢复中断
中断是一种协作机制,一个线程不能强制其他线程停止正在执行的操作而去执行其他操作. 什么是中断状态? 线程类有一个描述自身是否被中断了的boolean类型的状态,可以通过调用 .isInterrupte ...
- Java并发编程实战————Semaphore信号量的使用浅析
引言 本篇博客讲解<Java并发编程实战>中的同步工具类:信号量 的使用和理解. 从概念.含义入手,突出重点,配以代码实例及讲解,并以生活中的案例做类比加强记忆. 什么是信号量 Java中 ...
- Java并发编程实战_不愧是领军人物!这种等级的“Java并发编程宝典”谁能撰写?...
前言 大家都知道并发编程技术就是在同一个处理器上同时的去处理多个任务,充分的利用到处理器的每个核心,最大化的发挥处理器的峰值性能,这样就可以避免我们因为性能而产生的一些问题. 大厂的核心负载肯定是非常 ...
- java并发编程实战学习(3)--基础构建模块
转自:java并发编程实战 5.3阻塞队列和生产者-消费者模式 BlockingQueue阻塞队列提供可阻塞的put和take方法,以及支持定时的offer和poll方法.如果队列已经满了,那么put ...
- java单线程共享,「Java并发编程实战」之对象的共享
前言 本系列博客是对<Java并发编程实战>的一点总结,本篇主要讲解以下几个内容,内容会比较枯燥.可能大家看标题不能能直观的感受出到底什么意思,这就是专业术语,哈哈,解释下,术语(term ...
- 前置条件,不变性条件,后置条件 --《java并发编程实战》
阅读<java并发编程实战>4.1.1章 收集同步需求时, 反复出现了"不变性条件","不可变条件","后验条件",令我一头雾水 ...
- Java并发编程实战--FutureTask
FutureTask也可以用作闭锁.(FutureTask实现了Future语义,表示一种抽象的可生成结果的计算.FutureTask表示的计算是通过Callable来实现的,相当于一种可生成结果的R ...
最新文章
- lambda^k/k! 积分
- 深入XP之认识的引导文件NTLDR
- php json设置编码,php实现json编码的方法,phpjson编码
- es6 依赖循环_探索 JavaScript 中的依赖管理及循环依赖
- 山寨SaaS--管理软件夜未眠(五)
- 文巾解题 203. 移除链表元素
- python存储和读取数据时出现错误_python读取json文件存sql及codecs读取大文件问题...
- Servlet 工程 web.xml 中的 servlet 和 servlet-mapping 标签
- 十、vue-router学习笔记——认识路由、vue-router基本使用、vue-router嵌套路由、vue-router参数传递、vue-router导航守卫、keep-alive
- Centos7 安装pyhton3.7.4
- AI学会了视觉推理,“脑补”看不清的物体 | 李佳李飞飞等的CVPR论文
- 第3次作业:阅读《构建之法》1-5章
- 自动驾驶 6-2: 几何横向控制Lesson 2: Geometric Lateral Control - Pure Pursuit
- Java实战项目推荐(包括微服务、电商、支付项目、后台管理系统等)!
- 安装旧版本Xcode——MACOS
- 强烈推荐:20款优秀的数据可视化工具
- 如何根据函数,绘制出函数的图像
- python剔除st股_如何判断某只股票过去是不是ST股
- jupyther_python基础系列06第六章 函数 面向过程的编程
- 重庆高校在线平台的计算机基础,重庆高校在线开放课程建设数字课程建设标准.pdf...
热门文章
- 自动采集高清壁纸网站源码 二次美化版
- 华为机试真题 Java 实现【查找众数及中位数】
- XFS-新一代分布式文件系统-白皮书
- 【5G系列】AS层的网络选择(3)——小区重选标准
- 5G专网核心网部署模式与挑战
- IPC网络高清摄像机基础知识2(安霸半导体公司产品介绍 “来自2016年”)
- 为啥人类从1970年1月1日开始计时?
- python求峰面积_大学慕课2020年用Python玩转数据答案大全
- 小程序超出文字显示为省略号(代码简单的无话可说)
- [从头读历史] 第247节 夏商与西周时期的全面解读