java future_Java并发编程之异步Future机制的原理和实现
Java并发编程之异步Future机制的原理和实现
项目中经常有些任务需要异步(提交到线程池中)去执行,而主线程往往需要知道异步执行产生的结果,这时我们要怎么做呢?用runnable是无法实现的,我们需要用callable看下面的代码:
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class AddTask implements Callable {
private int a,b;
public AddTask(int a, int b) {
this.a = a;
this.b = b;
}
@Override
public Integer call() throws Exception {
Integer result = a + b;
return result;
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newSingleThreadExecutor();
//JDK目前为止返回的都是FutureTask的实例
Future future = executor.submit(new AddTask(1, 2));
Integer result = future.get();// 只有当future的状态是已完成时(future.isDone() = true),get()方法才会返回
}
}
虽然可以实现获取异步执行结果的需求,但是我们发现这个Future其实很不好用,因为它没有提供通知的机制,也就是说我们不知道future什么时候完成(如果我们需要轮询isDone()来判断的话感觉就没有用这个的必要了)。看下java.util.concurrent.future.Future 的接口方法:
public interface Future {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
由此可见JDK的Future机制其实并不好用,如果能给这个future加个监听器,让它在完成时通知监听器的话就比较好用了,就像下面这个IFuture:
package future;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* The result of an asynchronous operation.
*
* @author lixiaohui
* @param 执行结果的类型参数
*/
public interface IFuture extends Future {
boolean isSuccess(); // 是否成功
V getNow();//立即返回结果(不管Future是否处于完成状态)
Throwable cause();//若执行失败时的原因
boolean isCancellable(); //是否可以取消
IFuture await() throws InterruptedException; //等待future的完成
boolean await(long timeoutMillis) throws InterruptedException; // 超时等待future的完成
boolean await(long timeout, TimeUnit timeunit) throws InterruptedException;
IFuture awaitUninterruptibly(); //等待future的完成,不响应中断
boolean awaitUninterruptibly(long timeoutMillis);//超时等待future的完成,不响应中断
boolean awaitUninterruptibly(long timeout, TimeUnit timeunit);
IFuture addListener(IFutureListener l); //当future完成时,会通知这些加进来的监听器
IFuture removeListener(IFutureListener l);
}
接下来就一起来实现这个IFuture,在这之前要说明下Object.wait(),Object.notifyAll()方法,因为整个Future实现的原理的核心就是这两个方法.看看JDK里面的解释:
public class Object {
/**
* Causes the current thread to wait until another thread invokes the
* {@link java.lang.Object#notify()} method or the
* {@link java.lang.Object#notifyAll()} method for this object.
* In other words, this method behaves exactly as if it simply
* performs the call {@code wait(0)}.
* 调用该方法后,当前线程会释放对象监视器锁,并让出CPU使用权。直到别的线程调用notify()/notifyAll()
*/
public final void wait() throws InterruptedException {
wait(0);
}
/**
* Wakes up all threads that are waiting on this object's monitor. A
* thread waits on an object's monitor by calling one of the
* {@code wait} methods.
*
* The awakened threads will not be able to proceed until the current
* thread relinquishes the lock on this object. The awakened threads
* will compete in the usual manner with any other threads that might
* be actively competing to synchronize on this object; for example,
* the awakened threads enjoy no reliable privilege or disadvantage in
* being the next thread to lock this object.
*/
public final native void notifyAll();
}
知道这个后,我们要自己实现Future也就有了思路,当线程调用了IFuture.await()等一系列的方法时,如果Future还未完成,那么就调用future.wait() 方法使线程进入WAITING状态。而当别的线程设置Future为完成状态(注意这里的完成状态包括正常结束和异常结束)时,就需要调用future.notifyAll()方法来唤醒之前因为调用过wait()方法而处于WAITING状态的那些线程。完整的实现如下(代码应该没有很难理解的地方,我是参考netty的Future机制的。有兴趣的可以去看看netty的源码):
package future;
import java.util.Collection;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
*
* 正常结束时, 若执行的结果不为null, 则result为执行结果; 若执行结果为null, 则result = {@link AbstractFuture#SUCCESS_SIGNAL}
* 异常结束时, result为 {@link CauseHolder} 的实例;若是被取消而导致的异常结束, 则result为 {@link CancellationException} 的实例, 否则为其它异常的实例
* 以下情况会使异步操作由未完成状态转至已完成状态, 也就是在以下情况发生时调用notifyAll()方法:
*
*
异步操作被取消时(cancel方法)
*
异步操作正常结束时(setSuccess方法)
*
异步操作异常结束时(setFailure方法)
*
*
*
* @author lixiaohui
*
* @param
* 异步执行结果的类型
*/
public class AbstractFuture implements IFuture {
protected volatile Object result; // 需要保证其可见性
/**
* 监听器集
*/
protected Collection> listeners = new CopyOnWriteArrayList>();
/**
* 当任务正常执行结果为null时, 即客户端调用{@link AbstractFuture#setSuccess(null)}时,
* result引用该对象
*/
private static final SuccessSignal SUCCESS_SIGNAL = new SuccessSignal();
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (isDone()) { // 已完成了不能取消
return false;
}
synchronized (this) {
if (isDone()) { // double check
return false;
}
result = new CauseHolder(new CancellationException());
notifyAll(); // isDone = true, 通知等待在该对象的wait()的线程
}
notifyListeners(); // 通知监听器该异步操作已完成
return true;
}
@Override
public boolean isCancellable() {
return result == null;
}
@Override
public boolean isCancelled() {
return result != null && result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
}
@Override
public boolean isDone() {
return result != null;
}
@Override
public V get() throws InterruptedException, ExecutionException {
await(); // 等待执行结果
Throwable cause = cause();
if (cause == null) { // 没有发生异常,异步操作正常结束
return getNow();
}
if (cause instanceof CancellationException) { // 异步操作被取消了
throw (CancellationException) cause;
}
throw new ExecutionException(cause); // 其他异常
}
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (await(timeout, unit)) {// 超时等待执行结果
Throwable cause = cause();
if (cause == null) {// 没有发生异常,异步操作正常结束
return getNow();
}
if (cause instanceof CancellationException) {// 异步操作被取消了
throw (CancellationException) cause;
}
throw new ExecutionException(cause);// 其他异常
}
// 时间到了异步操作还没有结束, 抛出超时异常
throw new TimeoutException();
}
@Override
public boolean isSuccess() {
return result == null ? false : !(result instanceof CauseHolder);
}
@SuppressWarnings("unchecked")
@Override
public V getNow() {
return (V) (result == SUCCESS_SIGNAL ? null : result);
}
@Override
public Throwable cause() {
if (result != null && result instanceof CauseHolder) {
return ((CauseHolder) result).cause;
}
return null;
}
@Override
public IFuture addListener(IFutureListener listener) {
if (listener == null) {
throw new NullPointerException("listener");
}
if (isDone()) { // 若已完成直接通知该监听器
notifyListener(listener);
return this;
}
synchronized (this) {
if (!isDone()) {
listeners.add(listener);
return this;
}
}
notifyListener(listener);
return this;
}
@Override
public IFuture removeListener(IFutureListener listener) {
if (listener == null) {
throw new NullPointerException("listener");
}
if (!isDone()) {
listeners.remove(listener);
}
return this;
}
@Override
public IFuture await() throws InterruptedException {
return await0(true);
}
private IFuture await0(boolean interruptable) throws InterruptedException {
if (!isDone()) { // 若已完成就直接返回了
// 若允许终端且被中断了则抛出中断异常
if (interruptable && Thread.interrupted()) {
throw new InterruptedException("thread " + Thread.currentThread().getName() + " has been interrupted.");
}
boolean interrupted = false;
synchronized (this) {
while (!isDone()) {
try {
wait(); // 释放锁进入waiting状态,等待其它线程调用本对象的notify()/notifyAll()方法
} catch (InterruptedException e) {
if (interruptable) {
throw e;
} else {
interrupted = true;
}
}
}
}
if (interrupted) {
// 为什么这里要设中断标志位?因为从wait方法返回后, 中断标志是被clear了的,
// 这里重新设置以便让其它代码知道这里被中断了。
Thread.currentThread().interrupt();
}
}
return this;
}
@Override
public boolean await(long timeoutMillis) throws InterruptedException {
return await0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis), true);
}
@Override
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
return await0(unit.toNanos(timeout), true);
}
private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
if (isDone()) {
return true;
}
if (timeoutNanos <= 0) {
return isDone();
}
if (interruptable && Thread.interrupted()) {
throw new InterruptedException(toString());
}
long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime();
long waitTime = timeoutNanos;
boolean interrupted = false;
try {
synchronized (this) {
if (isDone()) {
return true;
}
if (waitTime <= 0) {
return isDone();
}
for (;;) {
try {
wait(waitTime / 1000000, (int) (waitTime % 1000000));
} catch (InterruptedException e) {
if (interruptable) {
throw e;
} else {
interrupted = true;
}
}
if (isDone()) {
return true;
} else {
waitTime = timeoutNanos - (System.nanoTime() - startTime);
if (waitTime <= 0) {
return isDone();
}
}
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
@Override
public IFuture awaitUninterruptibly() {
try {
return await0(false);
} catch (InterruptedException e) { // 这里若抛异常了就无法处理了
throw new java.lang.InternalError();
}
}
@Override
public boolean awaitUninterruptibly(long timeoutMillis) {
try {
return await0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis), false);
} catch (InterruptedException e) {
throw new java.lang.InternalError();
}
}
@Override
public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
try {
return await0(unit.toNanos(timeout), false);
} catch (InterruptedException e) {
throw new java.lang.InternalError();
}
}
protected IFuture setFailure(Throwable cause) {
if (setFailure0(cause)) {
notifyListeners();
return this;
}
throw new IllegalStateException("complete already: " + this);
}
private boolean setFailure0(Throwable cause) {
if (isDone()) {
return false;
}
synchronized (this) {
if (isDone()) {
return false;
}
result = new CauseHolder(cause);
notifyAll();
}
return true;
}
protected IFuture setSuccess(Object result) {
if (setSuccess0(result)) { // 设置成功后通知监听器
notifyListeners();
return this;
}
throw new IllegalStateException("complete already: " + this);
}
private boolean setSuccess0(Object result) {
if (isDone()) {
return false;
}
synchronized (this) {
if (isDone()) {
return false;
}
if (result == null) { // 异步操作正常执行完毕的结果是null
this.result = SUCCESS_SIGNAL;
} else {
this.result = result;
}
notifyAll();
}
return true;
}
private void notifyListeners() {
for (IFutureListener l : listeners) {
notifyListener(l);
}
}
private void notifyListener(IFutureListener l) {
try {
l.operationCompleted(this);
} catch (Exception e) {
e.printStackTrace();
}
}
private static class SuccessSignal {
}
private static final class CauseHolder {
final Throwable cause;
CauseHolder(Throwable cause) {
this.cause = cause;
}
}
}
那么要怎么使用这个呢,有了上面的骨架实现,我们就可以定制各种各样的异步结果了。下面模拟一下一个延时的任务:
package future.test;
import future.IFuture;
import future.IFutureListener;
/**
* 延时加法
* @author lixiaohui
*
*/
public class DelayAdder {
public static void main(String[] args) {
new DelayAdder().add(3 * 1000, 1, 2).addListener(new IFutureListener() {
@Override
public void operationCompleted(IFuture future) throws Exception {
System.out.println(future.getNow());
}
});
}
/**
* 延迟加
* @param delay 延时时长 milliseconds
* @param a 加数
* @param b 加数
* @return 异步结果
*/
public DelayAdditionFuture add(long delay, int a, int b) {
DelayAdditionFuture future = new DelayAdditionFuture();
new Thread(new DelayAdditionTask(delay, a, b, future)).start();
return future;
}
private class DelayAdditionTask implements Runnable {
private long delay;
private int a, b;
private DelayAdditionFuture future;
public DelayAdditionTask(long delay, int a, int b, DelayAdditionFuture future) {
super();
this.delay = delay;
this.a = a;
this.b = b;
this.future = future;
}
@Override
public void run() {
try {
Thread.sleep(delay);
Integer i = a + b;
// TODO 这里设置future为完成状态(正常执行完毕)
future.setSuccess(i);
} catch (InterruptedException e) {
// TODO 这里设置future为完成状态(异常执行完毕)
future.setFailure(e.getCause());
}
}
}
}
package future.test;
import future.AbstractFuture;
import future.IFuture;
//只是把两个方法对外暴露
public class DelayAdditionFuture extends AbstractFuture {
@Override
public IFuture setSuccess(Object result) {
return super.setSuccess(result);
}
@Override
public IFuture setFailure(Throwable cause) {
return super.setFailure(cause);
}
}
可以看到客户端不用主动去询问future是否完成,而是future完成时自动回调operationcompleted方法,客户端只需在回调里实现逻辑即可。
java future_Java并发编程之异步Future机制的原理和实现相关推荐
- Java高并发编程详解系列-线程池原理自定义线程池
之前博客的所有内容是对单个线程的操作,例如有Thread和Runnable的使用以及ThreadGroup等的使用,但是对于在有些场景下我们需要管理很多的线程,而对于这些线程的管理有一个统一的管理工具 ...
- 【Java 并发编程】线程池机制 ( ThreadPoolExecutor 线程池构造参数分析 | 核心线程数 | 最大线程数 | 非核心线程存活时间 | 任务阻塞队列 )
文章目录 前言 一.ThreadPoolExecutor 构造参数 二.newCachedThreadPool 参数分析 三.newFixedThreadPool 参数分析 四.newSingleTh ...
- 【Java 并发编程】线程池机制 ( 线程池示例 | newCachedThreadPool | newFixedThreadPool | newSingleThreadExecutor )
文章目录 前言 一.线程池示例 二.newCachedThreadPool 线程池示例 三.newFixedThreadPool 线程池示例 三.newSingleThreadExecutor 线程池 ...
- Java 并发编程——Executor框架和线程池原理
Java 并发编程系列文章 Java 并发基础--线程安全性 Java 并发编程--Callable+Future+FutureTask java 并发编程--Thread 源码重新学习 java并发 ...
- Java 7 并发编程指南
原文是发表在并发编程网上翻译后的 <Java 7 并发编程指南>,这里对其中的目录做个更加详细的描述,并且写出了重点说明,方便日后快速查阅.建议仔细查看每节的代码实现,非常具有参考价值.可 ...
- Java 多线程 并发编程
转载自 Java 多线程 并发编程 一.多线程 1.操作系统有两个容易混淆的概念,进程和线程. 进程:一个计算机程序的运行实例,包含了需要执行的指令:有自己的独立地址空间,包含程序内容和数据:不同进 ...
- Java多线程并发编程
一.线程池 1.1.什么是线程池 线程池是一种多线程的处理方式,利用已有线程对象继续服务新的任务(按照一定的执行策略),而不是频繁地创建销毁线程对象,由此提高服务的吞吐能力,减少CPU的闲置时间.具体 ...
- Java高并发编程学习(三)java.util.concurrent包
简介 我们已经学习了形成Java并发程序设计基础的底层构建块,但对于实际编程来说,应该尽可能远离底层结构.使用由并发处理的专业人士实现的较高层次的结构要方便得多.要安全得多.例如,对于许多线程问题,可 ...
- Java JUC并发编程详解
Java JUC并发编程详解 1. JUC概述 1.1 JUC简介 1.2 进程与线程 1.2 并发与并行 1.3 用户线程和守护线程 2. Lock接口 2.1 Synchronized 2.2 什 ...
最新文章
- 和tp数据库_CAN / CAN FD传输层(TP)详解
- opencv-mediapipe手部关键点识别
- 设计的核心任务之二:信息隐藏
- Cross_validation.train_test_split 中 stratify这个参数的意义是什么?
- 阿里如何实现100%容器化镜像化?八年技术演进之路回顾 1
- .NET做人脸识别并分类
- 10双屏鼠标过不去_灵耀X2 Duo双屏笔记本是怎样“炼”成的?对话华硕笔记本设计团队...
- 未来已来!分布式数据库的“星辰大海”绝不仅限于替换!
- 手机APP和微信小程序能否取代域名?
- html服装商品分类页面,商品分类.html
- VB之不能“VB6EXT.OLB”注册
- 【应用案例】CANape支持基于模型的ECU开发
- 制作U盘启动盘 优启通
- 徐艳(帮别人名字作诗)
- 为胎儿诵地藏经的好处(合集)怀孕的一定要看哦!
- 【转载】英语动词过去式ed的发音规则
- matlab射击小游戏,Matlab射箭小游戏设计,小虾米求救
- 计算机毕业设计JAVA软考在线题库系统mybatis+源码+调试部署+系统+数据库+lw
- S@Kura的PHP进阶之路(四)
- 微信公众号-消息推送
热门文章
- 支付宝php rsa签名验签工具,alipay rsa2 签名验证
- HTML form -enctype
- B9.流行的框架与新技术
- 数据库笔记01:SQL Server系统概述
- php加密 java rsa_PHP的DES加密和RSA签名(兼容java)
- mysql床数据库的命令_3种PHP连接MYSQL数据库的常用方法
- 曲线聚类_R语言确定聚类的最佳簇数:3种聚类优化方法
- vs2017 linux工程设置头文件,使用Visual Studio 2017作为Linux C++开发工具
- 连接器与加载器pdf_pdf转换为excel,你不会,同事点点鼠标2分钟就搞定了
- 【英语学习】【Daily English】U05 Places L03 I'd like to open an account