Java多线程系列 JUC线程池05 线程池原理解析(四)
转载 http://www.cnblogs.com/skywang12345/p/3544116.html https://blog.csdn.net/programmer_at/article/details/79799267
Executor执行Callable任务
Callable 和 Future 是比较有趣的一对组合。当我们需要获取线程的执行结果时,就需要用到它们。Callable用于产生结果,Future用于获取结果。
1. Callable
Callable 是一个接口,它只包含一个call()方法。Callable是一个返回结果并且可能抛出异常的任务。为了便于理解,我们可以将Callable比作一个Runnable接口,而Callable的call()方法则类似于Runnable的run()方法。
Callable的源码如下:
public interface Callable<V> {V call() throws Exception; }
说明:从中我们可以看出Callable支持泛型。
2. Future
Future 是一个接口。它用于表示异步计算的结果。提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。
Future的源码如下:
public interface Future<V> {// 试图取消对此任务的执行。boolean cancel(boolean mayInterruptIfRunning)// 如果在任务正常完成前将其取消,则返回 true。boolean isCancelled()// 如果任务已完成,则返回 true。boolean isDone()// 如有必要,等待计算完成,然后获取其结果。V get() throws InterruptedException, ExecutionException;// 如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException; }
说明: Future用于表示异步计算的结果。它的实现类是FutureTask,在讲解FutureTask之前,我们先看看Callable, Future, FutureTask它们之间的关系图,如下:
说明:
(01) RunnableFuture是一个接口,它继承了Runnable和Future这两个接口。RunnableFuture的源码如下:
public interface RunnableFuture<V> extends Runnable, Future<V> {void run(); }
(02) FutureTask实现了RunnableFuture接口。所以,我们也说它实现了Future接口。
示例和源码分析
我们先通过一个示例看看Callable和Future的基本用法,然后再分析示例的实现原理。
import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutionException;class MyCallable implements Callable {@Override public Integer call() throws Exception {int sum = 0;// 执行任务for (int i=0; i<100; i++)sum += i;//return sum; return Integer.valueOf(sum);} }public class CallableTest1 {public static void main(String[] args) throws ExecutionException, InterruptedException{//创建一个线程池ExecutorService pool = Executors.newSingleThreadExecutor();//创建有返回值的任务Callable c1 = new MyCallable();//执行任务并获取Future对象 Future f1 = pool.submit(c1);// 输出结果 System.out.println(f1.get()); //关闭线程池 pool.shutdown(); } }
运行结果:
4950
结果说明:
在主线程main中,通过newSingleThreadExecutor()新建一个线程池。接着创建Callable对象c1,然后再通过pool.submit(c1)将c1提交到线程池中进行处理,并且将返回的结果保存到Future对象f1中。然后,我们通过f1.get()获取Callable中保存的结果;最后通过pool.shutdown()关闭线程池。
1. submit任务,等待线程池execute
1. 执行FutureTask类的get方法时,会把主线程封装成WaitNode节点并保存在waiters链表中, 并阻塞等待运行结果;
2. FutureTask任务执行完成后,通过UNSAFE设置waiters相应的waitNode为null,并通过LockSupport类unpark方法唤醒主线程;
在实际业务场景中,Future和Callable基本是成对出现的,Callable负责产生结果,Future负责获取结果。
1. Callable接口类似于Runnable,只是Runnable没有返回值。
2. Callable任务除了返回正常结果之外,如果发生异常,该异常也会被返回,即Future可以拿到异步执行任务各种结果;
3. Future.get方法会导致主线程阻塞,直到Callable任务执行完成;
1. submit()
submit()在ExecutorService.java中的定义:
<T> Future<T> submit(Callable<T> task);<T> Future<T> submit(Runnable task, T result);Future<?> submit(Runnable task);
submit()在AbstractExecutorService.java中实现,AbstractExecutorService.
submit()实现了ExecutorService.
submit(),并且可以获取执行完的返回值, 而ThreadPoolExecutor是AbstractExecutorService.
submit()的子类,所以submit方法也是ThreadPoolExecutor的方法,它的源码如下:
public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();// 创建一个RunnableFuture对象RunnableFuture<T> ftask = newTaskFor(task);// 执行“任务ftask” execute(ftask);// 返回“ftask”return ftask; }
说明:submit()通过newTaskFor(task)创建了RunnableFuture对象ftask。它的源码如下:
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable); }
通过submit方法提交的Callable任务会被封装成了一个FutureTask对象。通过Executor.execute方法提交FutureTask到线程池中等待被执行,最终执行的是FutureTask的run方法;
2. FutureTask的构造函数
FutureTask的内部状态及构造函数如下:
public class FutureTask<V> implements RunnableFuture<V> {private volatile int state;private static final int NEW = 0;private static final int COMPLETING = 1;private static final int NORMAL = 2;private static final int EXCEPTIONAL = 3;private static final int CANCELLED = 4;private static final int INTERRUPTING = 5;private static final int INTERRUPTED = 6;public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();// callable是一个Callable对象this.callable = callable;// state记录FutureTask的状态this.state = NEW; // ensure visibility of callable } }
3. FutureTask的run()方法
我们继续回到submit()的源码中。
在newTaskFor()新建一个ftask对象之后,会通过execute(ftask)执行该任务。此时ftask被当作一个Runnable对象进行执行,最终会调用到它的run()方法;ftask的run()方法在java/util/concurrent/FutureTask.java中实现,源码如下:
public void run() {if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {// 将callable对象赋值给c。Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {// 执行Callable的call()方法,并保存结果到result中。result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);}// 如果运行成功,则将result保存if (ran)set(result);}} finally {runner = null;// 设置“state状态标记”int s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);} }
说明:FutureTask.run方法是在线程池中被执行的,而非主线程
1. 通过执行Callable任务的call方法;
2. 如果call执行成功,则通过set方法保存结果,之后调用FutureTask的get()方法,返回的就是通过set(result)保存的值;
3. 如果call执行有异常,则通过setException保存异常;
4. get方法
public V get() throws InterruptedException, ExecutionException {int s = state;if (s <= COMPLETING)s = awaitDone(false, 0L);return report(s); }
内部通过awaitDone方法对主线程进行阻塞,具体实现如下:
private int awaitDone(boolean timed, long nanos)throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;for (;;) {if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}int s = state;if (s > COMPLETING) {if (q != null)q.thread = null;return s;}else if (s == COMPLETING) // cannot time out yet Thread.yield();else if (q == null)q = new WaitNode();else if (!queued)queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);else if (timed) {nanos = deadline - System.nanoTime();if (nanos <= 0L) {removeWaiter(q);return state;}LockSupport.parkNanos(this, nanos);}elseLockSupport.park(this);}}
说明:
- 如果主线程被中断,则抛出中断异常;
- 判断FutureTask当前的state,如果大于COMPLETING,说明任务已经执行完成,则直接返回;
- 如果当前state等于COMPLETING,说明任务已经执行完,这时主线程只需通过yield方法让出cpu资源,等待state变成NORMAL;
- 通过WaitNode类封装当前线程,并通过UNSAFE添加到waiters链表;
- 最终通过LockSupport的park或parkNanos挂起线程;
转载于:https://www.cnblogs.com/lizhouwei/p/9119074.html
Java多线程系列 JUC线程池05 线程池原理解析(四)相关推荐
- Java多线程系列--“JUC线程池”06之 Callable和Future
转载自 Java多线程系列--"JUC线程池"06之 Callable和Future Callable 和 Future 简介 Callable 和 Future 是比较有趣的一 ...
- Java多线程系列--“JUC锁”05之 非公平锁
转载自:http://www.cnblogs.com/skywang12345/p/3496651.html点击打开链接 概要 前面两章分析了"公平锁的获取和释放机制",这一章开始 ...
- Java多线程系列---“JUC锁”01之 框架
本章,我们介绍锁的架构:后面的章节将会对它们逐个进行分析介绍.目录如下: 01. Java多线程系列--"JUC锁"01之 框架 02. Java多线程系列--"JUC锁 ...
- Java多线程系列--“JUC锁”03之 公平锁(一)
概要 本章对"公平锁"的获取锁机制进行介绍(本文的公平锁指的是互斥锁的公平锁),内容包括: 基本概念 ReentrantLock数据结构 参考代码 获取公平锁(基于JDK1.7.0 ...
- Java多线程系列--“JUC原子类”01之 框架
2019独角兽企业重金招聘Python工程师标准>>> Java多线程系列--"JUC原子类"01之 框架 根据修改的数据类型,可以将JUC包中的原子操作类可以分 ...
- Java多线程系列--“JUC原子类”03之 AtomicLongArray原子类
概要 AtomicIntegerArray, AtomicLongArray, AtomicReferenceArray这3个数组类型的原子类的原理和用法相似.本章以AtomicLongArray对数 ...
- Java多线程系列 JUC线程池01 线程池框架
转载 http://www.cnblogs.com/skywang12345/p/3509903.html 为什么引入Executor线程池框架 new Thread()的缺点 1. 每次new T ...
- Java多线程系列--“基础篇”10之 线程优先级和守护线程
概要 本章,会对守护线程和线程优先级进行介绍.涉及到的内容包括: 1. 线程优先级的介绍 2. 线程优先级的示例 3. 守护线程的示例 转载请注明出处:http://www.cnblogs.com/s ...
- Java多线程系列--“JUC锁”10之 CyclicBarrier原理和示例
CyclicBarrier简介 CyclicBarrier是一个同步辅助类,允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point).因为该 barrier 在释放等 ...
最新文章
- 设计模式复习-策略模式
- 0中断优先级_西门子S7-200 SMART中断及中断指令概述
- 如何自定义SAP Spartacus的路由路径
- oracle一体机高水位,oracle 移动高水位:
- 机器学习线性回归算法实验报告_从零实现机器学习算法(九)线性回归
- 如何理解 ListT和 DictionaryK,V 的扩容机制 ?
- 软件工程概论课后作业01
- 配置hiveserver2访问hive
- Redmi 卢伟冰:要做就做真旗舰 绝不通过简配压低成本
- 谷歌大脑AutoML最新进展:不断进化的阿米巴网络
- 吴恩达神经网络和深度学习-学习笔记-6-训练集、验证集和测试集 + 偏差bias和方差variance
- [机器学习] ——KNN K-最邻近算法
- C Primer Plus学习笔记(二)
- js判断是对象还是集合
- 定制10kV变压器感应雷直击雷击变压器加避雷器atp-emtp模型
- SqlServerSQL语句方式--视图创建
- 单核CPU vs. 多核CPU
- MSDC 4.3 接口规范(23)
- Hook函数三步走(SetWindowsHookEx、UnhookWindowsHookEx、CallNextHookEx)
- 云虚拟主机数据库连接和url重写
热门文章
- matplotlib输出图形到网页_必学python库Matplotlib教程分享
- centos selinux_如何临时或永久地禁用SELinux
- _Linux系统编程—信号集操作函数
- Roberta-wwm-ext-large模型中的wwm理解
- gbdt降低学习率可以实现正则化效果呢
- linux服务器挂载不上nfs,我遇上的挂载不上NFS文件系统的坑
- struts 修改拦截器修改返回值_关于struts2简单的介绍与示例
- 论文,成本管理与进度管理(主成本)
- PHP中文乱码的常见解决方法总结
- CentOS中怎样解压rar文件