定义

线程是一个重量级的对象,应该避免频繁创建和销毁。


class XXXPool{// 获取池化资源XXX acquire() {}// 释放池化资源void release(XXX x){}
}

线程池是一种生产者 - 消费者模式


//采用一般意义上池化资源的设计方法
class ThreadPool{// 获取空闲线程Thread acquire() {}// 释放线程void release(Thread t){}
}
//期望的使用
ThreadPool pool;
Thread T1=pool.acquire();
//传入Runnable对象
T1.execute(()->{//具体业务逻辑......
});//简化的线程池,仅用来说明工作原理
class MyThreadPool{//利用阻塞队列实现生产者-消费者模式BlockingQueue<Runnable> workQueue;//保存内部工作线程List<WorkerThread> threads = new ArrayList<>();// 构造方法MyThreadPool(int poolSize, BlockingQueue<Runnable> workQueue){this.workQueue = workQueue;// 创建工作线程for(int idx=0; idx<poolSize; idx++){WorkerThread work = new WorkerThread();work.start();threads.add(work);}}// 提交任务void execute(Runnable command){workQueue.put(command);}// 工作线程负责消费任务,并执行任务class WorkerThread extends Thread{public void run() {//循环取任务并执行while(true){ ①Runnable task = workQueue.take();task.run();} }}
}/** 下面是使用示例 **/
// 创建有界阻塞队列
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(2);
// 创建线程池
MyThreadPool pool = new MyThreadPool(10, workQueue);
// 提交任务
pool.execute(()->{System.out.println("hello");
});

在 MyThreadPool 的内部,我们维护了一个阻塞队列 workQueue 和一组工作线程,工作线程的个数由构造函数中的 poolSize 来指定。用户通过调用 execute() 方法来提交 Runnable 任务,execute() 方法的内部实现仅仅是将任务加入到 workQueue 中。MyThreadPool 内部维护的工作线程会消费 workQueue 中的任务并执行任务,相关的代码就是代码①处的 while 循环

ThreadPoolExecutor


ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)

corePoolSize:表示线程池保有的最小线程数。有些项目很闲,但是也不能把人都撤了,至少要留 corePoolSize 个人坚守阵地。maximumPoolSize:表示线程池创建的最大线程数。当项目很忙时,就需要加人,但是也不能无限制地加,最多就加到 maximumPoolSize 个人。当项目闲下来时,就要撤人了,最多能撤到 corePoolSize 个人。keepAliveTime & unit:上面提到项目根据忙闲来增减人员,那在编程世界里,如何定义忙和闲呢?很简单,一个线程如果在一段时间内,都没有执行任务,说明很闲,keepAliveTime 和 unit 就是用来定义这个“一段时间”的参数。也就是说,如果一个线程空闲了keepAliveTime & unit这么久,而且线程池的线程数大于 corePoolSize ,那么这个空闲的线程就要被回收了。
workQueue:工作队列,和上面示例代码的工作队列同义。threadFactory:通过这个参数你可以自定义如何创建线程,例如你可以给线程指定一个有意义的名字。handler:通过这个参数你可以自定义任务的拒绝策略。如果线程池中所有的线程都在忙碌,并且工作队列也满了(前提是工作队列是有界队列),那么此时提交任务,线程池就会拒绝接收。至于拒绝的策略,你可以通过 handler 这个参数来指定。ThreadPoolExecutor 已经提供了以下 4 种策略。
CallerRunsPolicy:提交任务的线程自己去执行该任务。
**AbortPolicy:**默认的拒绝策略,会 throws RejectedExecutionException。
**DiscardPolicy:**直接丢弃任务,没有任何异常抛出。DiscardOldestPolicy:丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列。
不建议使用 Executors 的最重要的原因是:Executors 提供的很多方法默认使用的都是无界的 LinkedBlockingQueue,高负载情境下,无界队列很容易导致 OOM,而 OOM 会导致所有请求都无法处理,这是致命问题。所以强烈建议使用有界队列。使用有界队列,当任务过多时,线程池会触发执行拒绝策略,线程池默认的拒绝策略会 throw RejectedExecutionException 这是个运行时异常,对于运行时异常编译器并不强制 catch 它,所以开发人员很容易忽略。因此默认拒绝策略要慎重使用。如果线程池处理的任务非常重要,建议自定义自己的拒绝策略;并且在实际工作中,自定义的拒绝策略往往和降级策略配合使用。


try {//业务逻辑
} catch (RuntimeException x) {//按需处理
} catch (Throwable x) {//按需处理
}

如何获取任务执行结果


// 提交Runnable任务
Future<?> submit(Runnable task);
// 提交Callable任务
<T> Future<T> submit(Callable<T> task);
// 提交Runnable任务及结果引用
<T> Future<T> submit(Runnable task, T result);

,Future 接口有 5 个方法,我都列在下面了,它们分别是取消任务的方法 cancel()、判断任务是否已取消的方法 isCancelled()、判断任务是否已结束的方法 isDone()以及2 个获得任务执行结果的 get() 和 get(timeout, unit),其中最后一个 get(timeout, unit) 支持超时机制。通过 Future 接口的这 5 个方法你会发现,我们提交的任务不但能够获取任务执行结果,还可以取消任务。不过需要注意的是:这两个 get() 方法都是阻塞式的,如果被调用的时候,任务还没有执行完,那么调用 get() 方法的线程会阻塞,直到任务执行完才会被唤醒。


// 取消任务
boolean cancel(boolean mayInterruptIfRunning);
// 判断任务是否已取消
boolean isCancelled();
// 判断任务是否已结束
boolean isDone();
// 获得任务执行结果
get();
// 获得任务执行结果,支持超时
get(long timeout, TimeUnit unit);

烧水壶


// 创建任务T2的FutureTask
FutureTask<String> ft2= new FutureTask<>(new T2Task());
// 创建任务T1的FutureTask
FutureTask<String> ft1= new FutureTask<>(new T1Task(ft2));
// 线程T1执行任务ft1
Thread T1 = new Thread(ft1);
T1.start();
// 线程T2执行任务ft2
Thread T2 = new Thread(ft2);
T2.start();
// 等待线程T1执行结果
System.out.println(ft1.get());// T1Task需要执行的任务:
// 洗水壶、烧开水、泡茶
class T1Task implements Callable<String>{FutureTask<String> ft2;// T1任务需要T2任务的FutureTaskT1Task(FutureTask<String> ft2){this.ft2 = ft2;}@OverrideString call() throws Exception {System.out.println("T1:洗水壶...");TimeUnit.SECONDS.sleep(1);System.out.println("T1:烧开水...");TimeUnit.SECONDS.sleep(15);// 获取T2线程的茶叶  String tf = ft2.get();System.out.println("T1:拿到茶叶:"+tf);System.out.println("T1:泡茶...");return "上茶:" + tf;}
}
// T2Task需要执行的任务:
// 洗茶壶、洗茶杯、拿茶叶
class T2Task implements Callable<String> {@OverrideString call() throws Exception {System.out.println("T2:洗茶壶...");TimeUnit.SECONDS.sleep(1);System.out.println("T2:洗茶杯...");TimeUnit.SECONDS.sleep(2);System.out.println("T2:拿茶叶...");TimeUnit.SECONDS.sleep(1);return "龙井";}
}
// 一次执行结果:
T1:洗水壶...
T2:洗茶壶...
T1:烧开水...
T2:洗茶杯...
T2:拿茶叶...
T1:拿到茶叶:龙井
T1:泡茶...
上茶:龙井

//任务1:洗水壶->烧开水
CompletableFuture<Void> f1 = CompletableFuture.runAsync(()->{System.out.println("T1:洗水壶...");sleep(1, TimeUnit.SECONDS);System.out.println("T1:烧开水...");sleep(15, TimeUnit.SECONDS);
});
//任务2:洗茶壶->洗茶杯->拿茶叶
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{System.out.println("T2:洗茶壶...");sleep(1, TimeUnit.SECONDS);System.out.println("T2:洗茶杯...");sleep(2, TimeUnit.SECONDS);System.out.println("T2:拿茶叶...");sleep(1, TimeUnit.SECONDS);return "龙井";
});
//任务3:任务1和任务2完成后执行:泡茶
CompletableFuture<String> f3 = f1.thenCombine(f2, (__, tf)->{System.out.println("T1:拿到茶叶:" + tf);System.out.println("T1:泡茶...");return "上茶:" + tf;});
//等待任务3执行结果
System.out.println(f3.join());void sleep(int t, TimeUnit u) {try {u.sleep(t);}catch(InterruptedException e){}
}
// 一次执行结果:
T1:洗水壶...
T2:洗茶壶...
T1:烧开水...
T2:洗茶杯...
T2:拿茶叶...
T1:拿到茶叶:龙井
T1:泡茶...
上茶:龙井

创建 CompletableFuture 对象主要靠下面代码中展示的这 4 个静态方法,我们先看前两个。在烧水泡茶的例子中,我们已经使用了runAsync(Runnable runnable)和supplyAsync(Supplier supplier),它们之间的区别是:Runnable 接口的 run() 方法没有返回值,而 Supplier 接口的 get() 方法是有返回值的。前两个方法和后两个方法的区别在于:后两个方法可以指定线程池参数。默认情况下 CompletableFuture 会使用公共的 ForkJoinPool 线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置 ForkJoinPool 线程池的线程数)。如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。


//使用默认线程池
static CompletableFuture<Void> runAsync(Runnable runnable)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
//可以指定线程池
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

当需要批量提交异步任务的时候建议你使用 CompletionService。CompletionService 将线程池 Executor 和阻塞队列 BlockingQueue 的功能融合在了一起,能够让批量异步任务的管理更简单。除此之外,CompletionService 能够让异步任务的执行结果有序化,先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待,同时还可以快速实现诸如 Forking Cluster 这样的需求。CompletionService 的实现类 ExecutorCompletionService,需要你自己创建线程池,虽看上去有些啰嗦,但好处是你可以让多个 ExecutorCompletionService 的线程池隔离,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。

询问最低报价


// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 创建CompletionService
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
// 异步向电商S1询价
cs.submit(()->getPriceByS1());
// 异步向电商S2询价
cs.submit(()->getPriceByS2());
// 异步向电商S3询价
cs.submit(()->getPriceByS3());
// 将询价结果异步保存到数据库
// 并计算最低报价
AtomicReference<Integer> m =new AtomicReference<>(Integer.MAX_VALUE);
for (int i=0; i<3; i++) {executor.execute(()->{Integer r = null;try {r = cs.take().get();} catch (Exception e) {}save(r);m.set(Integer.min(m.get(), r));});
}
return m;

Executor与线程池相关推荐

  1. Android并发之Executor(线程池)家族(二)之AtomicInteger

    线程并发,那就牵扯到内存共享的问题,在并发编程中,有三个理念:原子性.可见性.有序性.这里分享一个转载. 转载:Java volatile关键字最全总结:原理剖析与实例讲解(简单易懂) 在Thread ...

  2. Day841.Executor与线程池-Java 并发编程实战

    Executor与线程池 Hi,我是阿昌,今天学习记录的是关于Executor与线程池. 虽然在 Java 语言中创建线程看上去就像创建一个对象一样简单,只需要 new Thread() 就可以了,但 ...

  3. 戏(细)说Executor框架线程池任务执行全过程(上)

    原文链接   归档下发表于infoq.com 2015年6月的两篇文章. 内容综述 基于Executor接口中将任务提交和任务执行解耦的设计,ExecutorService和其各种功能强大的实现类提供 ...

  4. java executor spring_java 线程池(ExecutorService与Spring配置threadPoolTaskExecutor)

    一.java ExecutorService实现 创建ExecutorService变量 private ExecutorService executor = null 2.执行对应任务时,首先生成线 ...

  5. Java 并发 —— Thread、Executor、线程池

    Java 线程池: ThreadPoolExecutor,创建此线程池的方法: Executors.newFixedThreadPool(): Executors.newCachedThreadPoo ...

  6. Executor框架线程池参数配置原则

    线程池都经常用,但是具体的参数和具体的参数设置一定要知道,否会任务拒绝或者多线程上下文切换频繁: 高并发尽量不要用java提供的FixedThreadPool和SingleThreadExecutor ...

  7. Java多线程 线程池Executor框架

    目录 一.说明 二.理解 Executor ExecutorService Executors 三.实现 1. newSingleThreadExecutor 2. newFixedThreadPoo ...

  8. 一文搞懂线程池原理——Executor框架详解

    文章目录 1 使用线程池的好处 2 Executor 框架 2.1 Executor 框架结构 2.2 Executor 框架使用示意图 2.3 Executor 框架成员 2.3.1 Executo ...

  9. spark源码(四)executor在worker上的创建过程,executor本质是什么,是线程池吗?

    上篇文章我们讲解了master调度driver和executor(application)的过程,但是对于executor在worker上的创建过程没有讲,这里我们接着上篇文章继续讲. 首先我们从wo ...

  10. java线程池的概念_Java线程池的基本概念以及生命周期

    一.为什么要实现线程池? 线程的创建与销毁对于CPU而言开销较大,通过池化技术可避免重复的创建与销毁线程. 方便与线程资源统一管理. 二.几种常见的线程池以及核心参数 不推荐使用Executor创建线 ...

最新文章

  1. 分布式事务+DDD+负载均衡+服务治理已撸!微服务不就这点事?
  2. java程序无法连接redis 正常启动但是无法访问
  3. mysql中FIND_IN_SET的使用方法
  4. Android 实现边听边录音探究
  5. 在局域网可以访问电脑中存放的网页(IIS服务)
  6. stackoverflow_Stackoverflow:您尚未发现的7个最佳Java答案
  7. git 上传项目到linux仓库_总结:上传python项目至git上前的一些准备工作
  8. java 根据星期计算日期_Java 根据指定日期计算所在周的周一和周日
  9. android viewpager 不同页面底部菜单不同,viewpager不同页面中的不同菜单图标
  10. 多边多面形成体_Nature Comm | 中科院分子植物卓越中心巫永睿团队揭示类胡萝卜素影响玉米硬质胚乳形成的新机制...
  11. c语言周信东实验答案,桂林电子科技大学-C语言-程序设计-习题-答案(周信东)-实验4--数-组...
  12. Asp.net-MyFirstMVCProject详细解释
  13. V8声卡软件调试教程
  14. 【基础】杨辉三角python题解
  15. openCV 下载地址
  16. Linux 配置网络桥接模式
  17. 火车头采集器用法说明
  18. FileReader 文件操作
  19. 非核心版本的计算机上_计算机四级网络工程师知识点笔记(备考指南)
  20. 24V低压检测电路 - 低压检测电压

热门文章

  1. 语义化版本号 Sematic Versioning
  2. logogo.exe威金变种病毒
  3. chromecast 协议_Chromecast和Android TV有什么区别?
  4. 【python】自动化测试浏览器不关闭
  5. c语言 实验报告:分支结构程序设计
  6. EXCEL电子表格的基本操作
  7. python使用requests库爬取淘宝评论
  8. python爬取苏宁易购--jsonpath方法
  9. 树莓派安装系统和系统备份还原
  10. 思科路由器:学会看路由表信息,show ip route详解