目录

分工问题

线程池

Java线程池的基本用法

线程池添加线程的逻辑

线程池的重要参数:工作队列

线程池的重要参数:拒绝策略

创建线程池的快捷方法

线程数量

线程池使用原则

获取线程执行结果 - Future

终止线程池

CompletableFuture - 多线程异步编程

CompletionService - 多线程批量执行异步任务

Fork/Join - 多线程分治任务


分工问题

分工问题指如何高效地拆解任务,并分配给线程;

解决分工问题的方案,主要是靠多线程和相关操作来解决;

多线程的并行、聚合、批量,分治操作;

常规的并行任务,可以通过线程池+Future的方案来解决;

任务之间有聚合关系,AND、OR聚合,可以通过 CompletableFuture来解决;

任务批量并行,可以通过 CompletionService 来解决;

分治操作,使用Fork/Join。

线程池

Java中为了更好的管理多线程,一般使用线程池;

线程池是一种生产者 - 消费者模式;

线程池的使用方是生产者,线程池本身是消费者;

线程池能避免线程频繁创建、销毁的问题,而且能够限制线程的最大数量。

Java线程池的基本用法

public ThreadPoolExecutor(int corePoolSize, //表示线程池保有的最小线程数int maximumPoolSize, //表示线程池创建的最大线程数。long keepAliveTime, //如果一个线程空闲了keepAliveTime & unit这么久,而且线程池的线程数大于 corePoolSize ,那么这个空闲的线程就要被回收了。TimeUnit unit,BlockingQueue<Runnable> workQueue, //工作队列ThreadFactory threadFactory, //通过这个参数你可以自定义如何创建线程,例如可以给线程指定一个有意义的名字。RejectedExecutionHandler handler) //通过这个参数可以自定义任务的拒绝策略

线程池添加线程的逻辑

public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) { //1,线程数比corePoolSize数量少,创建新线程执行任务;if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) { //2,线程数达到corePoolSize,把任务缓存到队列中;int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false)) //3,任务队列已满,线程数还没达到maximumPoolSize数量,创建新建仓执行任务,否则执行拒绝策略;reject(command);
}

当有新任务到来时,线程池添加线程的逻辑:

1,线程数比corePoolSize数量少,创建新线程执行任务;

2,线程数达到corePoolSize,把任务缓存到队列中;

3,任务队列已满,线程数还没达到maximumPoolSize数量,创建新建仓执行任务,否则执行拒绝策略;

线程池的重要参数:工作队列

BlockingQueue 是双缓冲队列。BlockingQueue 允许两个线程同时向队列一个存储,一个取

出操作。在保证并发安全的同时,提高了队列的存取效率。

1,ArrayBlockingQueue:规定大小的 BlockingQueue,其构造必须指定大小。其所含的对象

是 FIFO 顺序排序的。

2,LinkedBlockingQueue:大小不固定的 BlockingQueue,若其构造时指定大小,生成的

BlockingQueue 有大小限制,不指定大小,其大小有 Integer.MAX_VALUE 来决定。其所含的对象

是 FIFO 顺序排序的。

3,PriorityBlockingQueue:类似于 LinkedBlockingQueue,但是其所含对象的排序不是

FIFO,而是依据对象的自然顺序或者构造函数的 Comparator 决定。

4,SynchronizedQueue:特殊的 BlockingQueue,对其的操作必须是放和取交替完成。

线程池的重要参数:拒绝策略

1,ThreadPoolExecutor.AbortPolicy: 丢弃任务并抛出 RejectedExecutionException异常;

2,ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常;

3,ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝

的任务;

4,ThreadPoolExecutor.CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务;

创建线程池的快捷方法

调用Executors类的静态方法;

1,newSingleThreadExecutor

创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行

所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所

有任务的执行顺序按照任务的提交顺序执行。

2,newFixedThreadPool

创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大

小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池

会补充一个新线程。

3,newCachedThreadPool

创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收

部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理

任务。

此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创

建的最大线程大小。

4,newScheduledThreadPool

创建一个大小无限的线程池,此线程池支持定时以及周期性执行任务的需求。

线程数量

如果是 CPU 密集型应用:线程的数量 =CPU 核数

如果是 IO 密集型应用:1 +(I/O 耗时 / CPU 耗时)* CPU 核数

I/O 耗时 / CPU 耗时可以使用2作为初始值;

以理论值为起始点,通过压测,动态调整线程数;

线程池使用原则

建议使用有界队列,高负载情境下,无界队列很容易导致 OOM,而 OOM 会导致所有请求都

无法处理,这是致命问题。

建议在创建线程池时,清晰地指明拒绝策略。

建议在实际工作中给线程赋予一个业务相关的名字。

有依赖关系的任务,需要为不同的任务创建不同的线程池,如果使用同一个线程池容易导致

线程死锁。

获取线程执行结果 - Future

ThreadPoolExecutor的3个 submit()方法;

// 提交Runnable任务Future submit(Runnable task);// 提交Callable任务Future submit(Callable task);// 提交Runnable任务及结果引用Future submit(Runnable task, T result);

1,提交 Runnable 任务 submit(Runnable task) :这个方法的参数是一个 Runnable 接口,

Runnable 接口的 run() 方法是没有返回值的,所以 submit(Runnable task) 这个方法返回的 Future

仅可以用来断言任务已经结束了,类似于 Thread.join()。

2,提交 Callable 任务 submit(Callable task):这个方法的参数是一个 Callable 接口,它只有

一个 call() 方法,并且这个方法是有返回值的,所以这个方法返回的 Future 对象可以通过调用其

get() 方法来获取任务的执行结果。

3,提交 Runnable 任务和结果引用 submit(Runnable task, T result):这个方法返回的 Future

对象 f,f.get() 的返回值就是传给 submit() 方法的参数 result。result 相当于主线程和子线程之间的

桥梁,通过它主子线程可以共享数据。

Future 接口的get方法:

获得任务执行结果的 get() , get(timeout, unit);

两个 get() 方法都是阻塞式的,如果被调用的时候,任务还没有执行完,那么调用 get() 方法

的线程会阻塞,直到任务执行完才会被唤醒。

终止线程池

线程池提供了两个方法:shutdown()和shutdownNow()

线程池执行 shutdown() 后,就会拒绝接收新的任务,但是会等待线程池中正在执行的任务和

已经进入阻塞队列的任务都执行完之后才最终关闭线程池。

线程池执行 shutdownNow() 后,会拒绝接收新的任务,同时还会中断线程池中正在执行的任

务,已经进入阻塞队列的任务也被剥夺了执行的机会,不过这些被剥夺执行机会的任务会作为

shutdownNow() 方法的返回值返回。

如果提交到线程池的任务不允许取消,那就不能使用 shutdownNow() 方法终止线程池。如果

提交到线程池的任务允许后续以补偿的方式重新执行,也是可以使用 shutdownNow() 方法终止线

程池的。

shutdown()和shutdownNow()组合使用:

调用shutdown()停止接受任务,等待已有任务执行完毕;

等待一段较长时间,调用boolean awaitTermination(timeOut, unit),判断线程池中剩余任务是

否都执行完毕;如果没有执行完毕,调用shutdownNow()强制关闭;

CompletableFuture - 多线程异步编程

CompletableFuture<Void> f1 = CompletableFuture.runAsync(()->{ //步骤1 });
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{return "步骤2";});
CompletableFuture<String> f3 = f1.thenCombine(f2, (f1Result, f2Result)->{return "步骤3";});

步骤1,步骤2并发执行,都执行完成之后,执行步骤3;

CompletableFuture 类还实现了 CompletionStage 接口,CompletionStage 接口可以描述串

行关系、AND 聚合关系、OR 聚合关系以及异常处理。

CompletionService - 多线程批量执行异步任务

        使用阻塞队列实现批量执行异步任务;

// 创建阻塞队列BlockingQueue bq = new LinkedBlockingQueue<>();executor.execute(()->{ return "步骤1" });executor.execute(()->{ return "步骤2" });executor.execute(()->{ return "步骤3" });for (int i=0; iInteger r = bq.take();executor.execute(()->handle(r));}

CompletionService 的实现原理也是内部维护了一个阻塞队列,当任务执行结束就把任务的执

行结果加入到阻塞队列中, CompletionService 是把任务执行结果的 Future 对象加入到阻塞队列

中。

        使用CompletionService 实现批量执行异步任务;

// 创建线程池ExecutorService executor = Executors.newFixedThreadPool(3);// 创建CompletionServiceCompletionService cs = new ExecutorCompletionService<>(executor);cs.submit(()->{ return "步骤1" });cs.submit(()->{ return "步骤2" });cs.submit(()->{ return "步骤3" });for (int i=0; iInteger r = cs.take().get();executor.execute(()->handle(r));}

Fork/Join - 多线程分治任务

分治任务模型可分为两个阶段:

1,任务分解,将任务迭代地分解为子任务,直至子任务可以直接计算出结果;

2,结果合并,逐层合并子任务的执行结果,直至获得最终结果。

Fork 对应的是分治任务模型里的任务分解,Join 对应的是结果合并。

Fork/Join 计算框架主要包含两部分,一部分是分治任务的线程池 ForkJoinPool,另一部分是

分治任务 ForkJoinTask。

ForkJoinTask 是一个抽象类,它的方法有很多,最核心的是 fork() 方法和 join() 方法,其中

fork() 方法会异步地执行一个子任务,而 join() 方法则会阻塞当前线程来等待子任务的执行结果。

ForkJoinTask 有两个子类——RecursiveAction 和 RecursiveTask。

RecursiveAction 定义的 compute() 没有返回值,而 RecursiveTask 定义的 compute() 方法是

有返回值的。

static void main(String[] args){//创建分治任务线程池ForkJoinPool fjp = new ForkJoinPool(4);//创建分治任务Fibonacci fib = new Fibonacci(30);//启动分治任务Integer result = fjp.invoke(fib);//输出结果System.out.println(result);}//递归任务static class Fibonacci extends RecursiveTask{final int n;Fibonacci(int n){this.n = n;}protected Integer compute(){if (n <= 1)return n;Fibonacci f1 = new Fibonacci(n - 1);//创建子任务f1.fork();Fibonacci f2 = new Fibonacci(n - 2);//等待子任务结果,并合并结果return f2.compute() + f1.join();}}

Java并发编程(二)- 分工相关推荐

  1. Java 并发编程(二):如何保证共享变量的原子性?

    线程安全性是我们在进行 Java 并发编程的时候必须要先考虑清楚的一个问题.这个类在单线程环境下是没有问题的,那么我们就能确保它在多线程并发的情况下表现出正确的行为吗? 我这个人,在没有副业之前,一心 ...

  2. Java并发编程(二十三)------并发设计模式之生产者消费者模式

    参考文章:Java实现生产者消费者问题与读者写者问题 目录 1. 生产者消费者问题 1.1 wait() / notify()方法 1.2 await() / signal()方法 1.2.1 对sy ...

  3. java面试-Java并发编程(二)——重排序

    当我们写一个单线程程序时,总以为计算机会一行行地运行代码,然而事实并非如此. 什么是重排序? 重排序指的是编译器.处理器在不改变程序执行结果的前提下,重新排列指令的执行顺序,以达到最佳的运行效率. 重 ...

  4. java并发编程(二十六)——单例模式的双重检查锁模式为什么必须加 volatile?

    前言 本文我们从一个问题出发来进行探究关于volatile的应用. 问题:单例模式的双重检查锁模式为什么必须加 volatile? 什么是单例模式 单例模式指的是,保证一个类只有一个实例,并且提供一个 ...

  5. [Java并发编程(二)] 线程池 FixedThreadPool、CachedThreadPool、ForkJoinPool?为后台任务选择合适的 Java executors...

    [Java并发编程(二)] 线程池 FixedThreadPool.CachedThreadPool.ForkJoinPool?为后台任务选择合适的 Java executors ... 摘要 Jav ...

  6. java并发编程(二十一)----(JUC集合)CopyOnWriteArraySet和ConcurrentSkipListSet介绍

    转载自  java并发编程(二十一)----(JUC集合)CopyOnWriteArraySet和ConcurrentSkipListSet介绍 这一节我们来接着介绍JUC集合:CopyOnWrite ...

  7. 【Java并发编程】之二:线程中断

    [Java并发编程]之二:线程中断 使用interrupt()中断线程 ​ 当一个线程运行时,另一个线程可以调用对应的Thread对象的interrupt()方法来中断它,该方法只是在目标线程中设置一 ...

  8. # Java 并发编程的艺术(二)

    Java 并发编程的艺术(二) 文章目录 Java 并发编程的艺术(二) 并发编程的挑战 上下文切换 如何减少上下文的切换 死锁 资源限制的挑战 Java 并发机制的底层实现原理 volatile 的 ...

  9. java并发编程实战(二)

    java并发编程中常常会用到两种容器来存放一些数据,这些数据需要保证能在多线程下正常访问.常见的容器分为两类:同步容器和并发容器.在java并发编程实战一书中的第五章也有讲解. 什么是同步容器以及优劣 ...

  10. 简明高效的 Java 并发编程学习指南

    你好,我是宝令,<Java 并发编程实战>专栏作者,很高兴你能看到这篇内容. 对于一个Java程序员而言,能否熟练掌握并发编程是判断他优秀与否的重要标准之一.因为并发编程是Java语言中最 ...

最新文章

  1. numpy 中的 squeeze() 函数
  2. Ts + React + Mobx 实现移动端浏览器控制台
  3. 从unmarshal带json字符串字段的json说起
  4. Sublime Text2,跨平台神级编辑器乱码问题解决
  5. 谁来理解外来工的孩子的心理健康?
  6. 在.net 4.0程序中使用TPL Dataflow
  7. Apache JMeter 测试webservice接口
  8. [Web Chart系列之五] 5. 实战draw2d之figure tooltip 实现
  9. AndroidOpenCV摄像头预览全屏问题
  10. OpenJDK8 272在MIPS上的编译修改记录
  11. JAVA实现UNIX文件管理系统
  12. [开源] PLC梯形图转指令表的算法源代码
  13. 工作一年时期的土豆总结——复杂度和困难度
  14. JavaScript throw 语句
  15. 华为鸿蒙跑了个“hello world”!跑通后,我特么开始怀疑人生....
  16. java毕业设计花漾网在线商城mybatis+源码+调试部署+系统+数据库+lw
  17. 【参赛时间延长】InterSystems技术写作大赛:Python
  18. 曲面积分(Surface Integral)
  19. android 免root冻结,自冻FreezeYou(超强免ROOT冻结神器)
  20. Nginx:正向代理与反向代理

热门文章

  1. 紫外线强度检测传感器GUVA-S12SD的应用
  2. 方差、协方差、四分位差笔记
  3. 狸猫哥哥和他的冬葵花
  4. echarts基础语法
  5. 玉米、水稻、甘蔗等农作物图片数据集
  6. 一种高超声速飞行器弹道的仿真方法
  7. 测试如何与开发人员进行沟通
  8. reshape 与 shape
  9. git 和 linux、_linux和git拔掉
  10. 计算机自考本科英语二可以用什么代替,自考英语二用什么可以代替免考