Java之JUC并发编程
JUC
- JUC概述
- 进程与线程
- 线程的状态
- wait / sleep的区别
- 并发和并行
- 管程
- 用户线程与守护线程
- 用户线程
- 守护线程
- Lock 接口
- synchronized
- Lock锁
- lock()
- 各种锁介绍
- Lock接口的实现类
- 线程间的通信
- Java集合中的线程安全
- 解决方案一:Vector
- 解决方案二:Collections
- 解决方案三:CopyOnWriteArrayList
- 其它集合的线程安全
- synchronized
- Callable 接口
- Future 接口
- Callable和Future 接口的应用场景
- JUC三大辅助类
- CountDownLatch
- CyclicBarrier
- Semaphore
- BlockingQueue 阻塞队列
- ArrayBlockingQueue
- LinkedBlockingQueue
- DelayQueue
- PriorityBlockingQueue
- BlockingQueue 核心方法
- 线程池
- 线程池的分类
- Executors.newFixedThreadPool(int n)的使用
- Executors.newSingleThreadExecutor()的使用
- Executors.newCacheThreadPool()的使用
- 自定义线程池
- 拒绝策略
- 自定义实现线程池
- 线程池工作流程
- 注意事项
- Fork/Join
- Fork/Join 框架的实现原理
- 案例
- CompletableFuture
- CompletableFuture简介
- Future 与 CompletableFuture
JUC概述
JUC就是 java.util.concurrent 工具包的简称.这是一个处理线程的工具包。JDK1.5开始出现的。
进程与线程
进程(Process) 是计算机中的程序关于某数据集合上的一次运行活动,**是系统进行资源分配和调度的基本单元,是操作系统结构的基础。**在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体。
线程(thread) 是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运算单位。一个线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每个线程并行执行不同的任务。
总结:
进程:指系统中正在运行的一个应用程序,程序一旦运行就是进程;进程是资源分配的最小单位。
线程:系统分配处理器时间资源的基本单位,线程是程序执行的最小单位。
线程的状态
- New(新建):尚未启动的线程
- Runnable(准备就绪):可以运行的线程,但是需要等待处理器的调度。
- Blocked(阻塞):线程在等待monitor锁
- Waiting(等待状态):线程在无限等待唤醒
- Timed_Waiting(等待状态):线程在等待唤醒,但设置了时限
- Terminated(终结):线程已经完成执行。
wait / sleep的区别
- sleep是Thread的静态方法,wait是Object的方法,任何对象实例都能调用。
- sleep()方法正在执行的线程主动让出CPU(然后CPU就可以去执行其他任务),在sleep指定时间后CPU再回到该线程继续往下执行(注意:sleep方法只让出了CPU,而并不会释放同步资源锁!!!);
- wait()方法则是指当前线程让自己暂时退让出同步资源锁,以便其他正在等待该资源的线程得到该资源进而运行,只有调用了notify()方法,之前调用wait()的线程才会解除wait状态,可以去参与竞争同步资源锁,进而得到执行。
- sleep()方法可以在任何地方使用;wait()方法则只能在同步方法或同步块中使用;
- sleep()是线程线程类(Thread)的方法,调用会暂停此线程指定的时间,但监控依然保持,不会释放对象锁,到时间自动恢复;
- wait()是Object的方法,调用会放弃对象锁,进入等待队列,待调用notify()/notifyAll()唤醒指定的线程或者所有线程,才会进入锁池,再次获得对象锁才会进入运行状态;
- 它们都可以被interrupted方法中断。
并发和并行
并行:可以同时运行多个线程,多个线程同时执行,并行的效率从代码层次上强依赖于多进程/多线程代码,从硬件角度上则依赖于多核 CPU。
并发(concurrent):指多个进程可以同时运行的现象。并不是真的同时运行。只是提供一种功能让用户看起来多个程序同时运行,在实际执行中,会进行进程的切换。
管程
管程是指管理共享变量以及对共享变量操作的过程,让它们支持并发。翻译成Java领域的语言,就是管理类的状态变量,让这个类是线程安全的。管程对应的英文是Monitor,直译为“监视器”,而操作系统领域一般翻译为“管程”。
synchronized关键字和wait()、notify()、notifyAll()这三个方法是Java中实现管程技术的组成部分。学习操作系统时,在线程一块还有信号量机制,管程在功能上和信号量及PV操作类似,属于一种进程同步互斥工具。Java选择管程来实现并发主要还是因为实现管程比较容易。
JVM 中同步是基于进入和退出管程(monitor)对象实现的,每个对象都会有一个管程(monitor)对象,管程(monitor)会随着 java 对象一同创建和销毁。
执行线程首先要持有管程对象,然后才能执行方法,当方法完成之后会释放管程,方法在执行时候会持有管程,其他线程无法再获取同一个管程。
用户线程与守护线程
在Java中有两类线程:User Thread(用户线程)、Daemon Thread(守护线程)。
用户线程:当存在任何一个用户线程未离开,JVM是不会离开的。
守护线程:如果只剩下守护线程未离开,JVM是可以离开的。
通过Thread.setDaemon(false)设置为用户线程
通过Thread.setDaemon(true)设置为守护线程
setDaemon()方法必须在线程的start()方法之前调用,在后面调用会报异常,并且不起效
简而言之,当JVM中没有用户线程,将会退出。
用户线程
定义:不需要内核支持而在用户程序中实现的线程,其不依赖于操作系统核心,应用进程利用线程库提供创建、同步、调度和管理线程的函数来控制用户线程。平时用到的普通线程均是用户线程,当在Java程序中创建一个线程,它就被称为用户线程
守护线程
定义:是个服务线程,准确地来说就是服务其他的线程,这是它的作用——而其他的线程只有一种,那就是用户线程。所以java里线程分2种
- 守护线程,比如垃圾回收线程,就是最典型的守护线程。
- 用户线程,就是应用程序里的自定义线程
Lock 接口
synchronized
synchronized是Java中的关键字,是一种同步锁。
锁是用于通过多个线程控制对共享资源的访问的工具。 通常,锁提供对共享资源的独占访问:一次只能有一个线程可以获取锁,并且对共享资源的所有访问都要求首先获取锁。
可以修饰的对象有:
- 修饰一个代码块,被修饰的代码块称为同步语句块,其作用范围是大括号括起来的代码,作用的对象是调用这个代码块的对象;
- 修饰一个方法,被修饰的方法称为同步方法,其作用的范围是整个方法,作用的对象是调用这个方法的对象。
- 修改一个静态的方法,其作用的范围是整个静态方法,作用的对象是这个类的所有对象;
- 修改一个类,其作用的范围是 synchronized 后面括号括起来的部分,作用主的对象是这个类的所有对象。
如果一个代码块被synchronized修饰,当一个线程获取了对应的锁,并执行该代码块时,其它线程只能一直等待,获取锁的线程释放锁,唤醒阻塞的线程。
释放锁的情况:
- 获取锁的线程执行完了该代码块,然后线程释放对锁的占有。
- 线程执行发生异常,JVM会让线程自动释放锁。
如果获取锁的线程由于其他原因被阻塞了,但又没有释放锁,其他线程便只能等待。
Lock锁
Lock锁实现了提供比使用同步方法和语句可以获得的更广泛的锁操作。Lock提供了比synchronized更多的功能。
Lock 与的 Synchronized 区别
- synchronized 是 Java 语言的关键字,因此是内置特性。 Lock 是一个接口,通过这个接口的实现类可以实现同步访问;
- 采用 synchronized 不需要用户去手动释放锁,当 synchronized 方法或者 synchronized 代码块执行完之后,系统会自动让线程释放对锁的占用;Lock 则必须要用户去手动释放锁,如果没有主动释放锁,就有可能导致出现死锁现象。
- Lock 可以让等待锁的线程响应中断,而 synchronized 却不行,使用synchronized 时,等待的线程会一直等待下去,不能够响应中断;
- 通过 Lock 可以知道有没有成功获取锁,而 synchronized 却无法办到。
- Lock 可以提高多个线程进行读操作的效率
- 在性能上来说,如果竞争资源不激烈,两者的性能是差不多的,而当竞争资源非常激烈时(即有大量线程同时竞争),此时 Lock 的性能要远远优于synchronized
public interface Lock {// 获得锁。void lock(); // 获取锁定,除非当前线程是 interrupted 。void lockInterruptibly() throws InterruptedException;// 只有在调用时才可以获得锁。boolean tryLock(); // 如果在给定的等待时间内是空闲的,并且当前的线程尚未得到 interrupted,则获取该锁。boolean tryLock(long time, TimeUnit unit) throws InterruptedException;// 释放锁。void unlock(); // 返回一个新Condition绑定到该实例Lock实例。Condition newCondition();
}
采用 Lock,必须主动去释放锁,并且在发生异常时,不会自动释放锁。因此一般来说,使用 Lock 必须在 try{}catch{}块中进行,并且将释放锁的操作放在finally 块中进行,以保证锁一定被被释放,防止死锁的发生。
lock()
lock()方法是平常使用得最多的一个方法,就是用来获取锁。如果锁已被其他线程获取,则进行等待。
使用 lock() 的一般形式
Lock lock = ...;
lock.lock(); //获得锁
try{//处理任务
}catch(Exception ex){}finally{lock.unlock(); //释放锁
}
各种锁介绍
乐观锁对应于生活中乐观的人总是想着事情往好的方向发展,悲观锁对应于生活中悲观的人总是想着事情往坏的方向发展。
悲观锁:总是假设最坏的情况,每次去操作数据之前,都会认为别人会对数据进行修改,所以在拿数据的时候都会上锁,这样别人想拿这个数据就会阻塞,直到拿到锁(共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程)。
乐观锁:总是假设最好的情况,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号机制和CAS算法实现。
版本号机制
一般是在数据表中加上一个数据版本号version字段,表示数据被修改的次数,当数据被修改时version值会加一。当线程A要更新数据值时,在读取数据的同时也会读取version值,在提交更新时,若刚才读取到的version值为当前数据库中的version值相等时才更新,否则重试更新操作,直到更新成功。
CAS算法
compare and swap(比较与交换),是一种有名的无锁算法。无锁编程,即不使用锁的情况下实现多线程之间的变量同步,也就是在没有线程被阻塞的情况下实现变量的同步,所以也叫非阻塞同步(Non-blocking Synchronization)。
CAS算法涉及到三个操作数
原来的值 V
保存比较的值 A
更新后的值 B
当且仅当 V 的值等于 A时,CAS通过原子方式用新值B来更新V的值,否则不会执行任何操作。一般情况下是一个自旋操作,即不断的重试。
表锁
表锁就是一锁锁一整张表,在表被锁定期间,其他事务不能对该表进行操作,必须等当前表的锁被释放后才能进行操作。
表锁的劣势:开销小;加锁快;不会出现死锁
表锁的优势:锁的粒度大,发生锁冲突的概率高;处理并发的能力弱
行锁
行锁是对表中的记录加锁,在记录锁定期间,其他事物不能对该记录进行操作,必须等当前记录的锁被释放后才能进行操作。
行锁的劣势:开销大;加锁慢;会出现死锁
行锁的优势:锁的粒度小,发生锁冲突的概率低;处理并发的能力强
Lock接口的实现类
Lock接口有三个实现类,ReentrantLock , ReentrantReadWriteLock.ReadLock , ReentrantReadWriteLock.WriteLock,下面分别介绍:
ReentrantLock 可重入锁
所谓可重入锁,即多次获得同一个锁,但在解锁时,也需要多次解锁,与加锁的次数一致。此锁最多支持同一个线程的2147483647递归锁。 尝试超过此限制会导致Error从锁定方法中抛出。
ReentrantLock是一个排他锁,同一时间只允许一个线程访问。
构造方法:
ReentrantLock()
ReentrantLock(boolean fair)
ReentrantLock() 创建一个ReentrantLock的实例。 这相当于使用ReentrantLock(false) 。
ReentrantLock(boolean fair) 根据给定的公平政策创建一个 ReentrantLock的实例。
fair - true如果此锁应使用合理的订购策略
公平锁
设置构造函数的公平参数, 当设置true ,为公平锁,在争用下,锁有利于授予访问最长等待的线程,即先来后到。
非公平锁
设置构造函数的公平参数, 当设置false,该锁不保证任何特定的访问顺序。由系统随机决定。默认为非公平锁。
使用公平锁的程序可能会比使用默认设置的整体吞吐量(即,更慢,通常要慢得多),但是具有更小的差异来获得锁定并保证缺乏饥饿。
ReentrantReadWriteLock读写锁,它表示两个锁,ReentrantReadWriteLock.ReadLock是读操作相关的锁 ,称为共享锁;ReentrantReadWriteLock.WriteLock是写相关的锁,称为排他锁
一个用来获取读锁,一个用来获取写锁。也就是说将文件的读写操作分开,分成 2 个锁来分配给线程,从而使得多个线程可以同时进行读操作。
线程进入读锁的前提条件:
- 没有其他线程的写锁,
- 没有写请求
- 有写请求,但调用线程和持有锁的线程是同一个。
线程进入写锁的前提条件:
- 没有其他线程的读锁
- 没有其他线程的写锁
读读共享、写写互斥、读写互斥
而读写锁有以下三个重要的特性:
(1)公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平。
(2)重进入:读锁和写锁都支持线程重进入。
(3)锁降级:遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为读锁。
锁降级:写的时候可以读,这时候,写锁就发生了降级,称为了读锁(同一个线程)
对于一个线程获取了写锁,可以继续进行读锁的获取,因为已经写锁,写锁是排他的。相当于锁重入。
过程:
- 先获取写锁
- 再获取读锁
- 释放写锁
- 释放读锁
使用 ReentrantReadWriteLock 锁
// 该锁由方法 readLock()返回。
static class ReentrantReadWriteLock.ReadLock
// 该锁由方法 writeLock()返回。
static class ReentrantReadWriteLock.WriteLock
获得读写锁
ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
// 获得读锁
ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
// 获得写锁
ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();
注意:
如果有一个线程已经占用了读锁,则此时其他线程如果要申请写锁,则申请写锁的线程会一直等待释放读锁。
如果有一个线程已经占用了写锁,则此时其他线程如果申请写锁或者读锁,则申请的线程会一直等待释放写锁。
线程间的通信
线程间通信的模型有两种:共享内存和消息传递。
内存模型:每个线程都有自己的工作内存。将主存中的数据缓存到工作内存,等完成操作之后,在同步到主存。
但是如果多个线程,对同一个共享资源进行操作,会产生同步问题。
方式一:使用 volatile 关键字
基于 volatile 关键字来实现线程间相互通信是使用共享内存的思想,多个线程同时监听一个变量,当其中一个线程对这个变量进行改变之后 ,其它线程具有可见性,就是会到主存中重新获取变量的最新值,以覆盖线程中工作内存的最新值。
方式二:使用Object类的 wait() 和 notify() ,notifyAll()方法。
如果调用某个对象的wait()方法,当前线程必须拥有这个对象的monitor(即锁),因此调用wait()方法必须在同步块或者同步方法中进行(synchronized块或者synchronized方法)。
notify()和notifyAll()方法只是唤醒等待该对象的monitor的线程,并不决定哪个线程能够获取monitor。
调用某个对象的wait()方法,相当于让当前线程交出此对象的锁(wait方法释放当前锁),然后进入等待状态,等待后续再次获得此对象的锁。
notify/notifyAll() 被执行时候,才会唤醒一个或多个正处于等待状态的线程,然后继续往下执行(notify/notifyAll不会释放锁),直到执行完synchronized 代码块的代码或是中途遇到wait() ,再次释放锁。
notify唤醒沉睡的线程后,线程会接着上次的执行继续往下执行。所以在进行条件判断时候,可以先把 wait 语句忽略不计来进行考虑;显然,要确保程序一定要执行,并且要保证程序直到满足一定的条件再执行,要使用while进行等待,直到满足条件才继续往下执行。
方法三:Condition
Condition是在java 1.5中才出现的,它用来替代传统的Object的wait()、notify()、notifyAll()实现线程间的协作,相比使用Object的wait()、notify()、notifyAll()使用Condition的await()、signal()、signalAll这种方式实现线程间协作更加安全和高效。
Condition是个接口依赖于Lock接口,生成一个Condition的基本代码是lock.newCondition()
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
调用Condition的await()和signal()方法,都必须在lock保护之内,就是说必须在lock.lock()和lock.unlock之间才可以使用。
Conditon中的await()对应Object的wait();
Condition中的signal()对应Object的notify();
Condition中的signalAll()对应Object的notifyAll()。
Java集合中的线程安全
常用的集合类型如ArrayList,HashMap,HashSet等,在并发环境下修改操作都是线程不安全的,会抛出java.util.ConcurrentModificationException异常。
以 ArrayList为例,在多线程同时对集合进行操作。
public static void main(String[] args) {ArrayList<String> list = new ArrayList<>();for(int i=0; i<100 ; i++) {new Thread(()->{list.add(UUID.randomUUID().toString().substring(0,8));System.out.println(list.size());},":"+i).start();}
}
在多个线程同时对list集合进行读写操作时,会出现Exception in thread “:37” java.util.ConcurrentModificationException这个异常。那是因为多个线程同时对集合进行了读写。
add方法源码,可以看出集合在多线程环境下不支持线程安全
public boolean add(E e) {ensureCapacityInternal(size + 1); // Increments modCount!!elementData[size++] = e;return true;
}
解决方案一:Vector
Vector 是矢量队列,它是 JDK1.0 版本添加的类。继承于 AbstractList,实现了 List, RandomAccess, Cloneable 这些接口。 Vector 继承了 AbstractList,实现了 List;所以, 它是一个队列,支持相关的添加、删除、修改、遍历等功能。 Vector 实现了 RandmoAccess 接口,即提供了随机访问功能。
RandmoAccess 是 java 中用来被 List 实现,为 List 提供快速访问功能的。在Vector 中,我们即可以通过元素的序号快速获取元素对象;这就是快速随机访问。 Vector 实现了 Cloneable 接口,即实现 clone()函数。它能被克隆。
和 ArrayList不同,Vector是支持线程安全的。
Vector中add方法源码,可以看出,方法中加了 synchronized关键字,方法是加锁的,所以支持线程安全,将上一个例子中 ArrayList换成 Vector就行。
public synchronized boolean add(E e) {modCount++;ensureCapacityHelper(elementCount + 1);elementData[elementCount++] = e;return true;
}
解决方案二:Collections
Collections工具类是一个集合的工具类,其中有一些方法可以实现集合的线程安全。
提供了很多实现线程安全的方法。
synchronizedList 方法为例。参数为一个普通的 ArrayList对象,返回值是一个线程安全的list对象。
public static void main(String[] args) {List list = Collections.synchronizedList(new ArrayList<>());for (int i = 0; i < 100; i++) {new Thread(() ->{list.add(UUID.randomUUID().toString());System.out.println(list);}, "线程" + i).start();}
}
解决方案三:CopyOnWriteArrayList
CopyOnWriteArrayList相当于线程安全的ArrayList。和ArrayList一样,它是个可变数组,但是和ArrayList不同。
使用CopyOnWriteArrayList 来保证线程安全。
public static void main(String[] args) {ArrayList<String> list = new ArrayList<>();Vector vector = new Vector();List list = new CopyOnWriteArrayList<String>();for(int i=0; i<500 ; i++) {new Thread(()->{list.add(UUID.randomUUID().toString().substring(0,8));System.out.println(list);},":"+i).start();}
}
CopyOnWriteArrayList 的原理
内部有个volatile 数组array来保存数据。在更新数据时,都会新建一个副本,并将更新后的数据写到新建的副本中,再该数组赋值给volatile 数组,这就是它叫做 CopyOnWriteArrayList 的原因。
由于它在更新数据时,都会新建副本,所以涉及到修改数据的操作, CopyOnWriteArrayList 效率很低;但是单单只是进行遍历查找的话,效率比较高。
CopyOnWriteArrayList 读取操作的实现:
读取操作没有任何同步控制和锁操作,理由就是内部数组 array 不会发生修改,只会被另外一个 array 替换,因此可以保证数据安全。
CopyOnWriteArrayList更新操作的实现:
CopyOnWriteArrayList在对集合进行更新操作的时候加了锁,保证同步,避免多线程写的时候会 copy 出多个副本(写写互斥)。
线程安全机制
- 通过 volatile 和互斥锁来实现的。
- 通过“volatile 数组” 来保存数据的。一个线程读取 volatile 数组时,volatile关键字保证了可见性,通过 volatile 提供了“读取到的数据总是最新的” 这个机制的保证。
- 通过互斥锁来保护数据。在更新数据时,会先获取互斥锁,再修改完毕之后,先将数据更新volatile 数组中,然后再释放互斥锁,就达到了保护数据的目的。
其它集合的线程安全
HashMap对应的线程安全的集合 HashTable
可以使用 Collections 工具类中的synchronizedMap方法获取线程安全的集合。
也可以使用 ConcurrentHashMap线程安全的HashMap,与 CopyOnWriteArrayList 使用一样。
HashSet可以使用 Collections 工具类中的synchronizedSet方法获取线程安全的集合。
也可以使用CopyOnWriteArraySet,线程安全的HashSet,与 CopyOnWriteArrayList 使用一样。
synchronized
一个对象里面如果有多个 synchronized 方法,某一个时刻内,只要一个线程去调用其中的一个 synchronized 方法了,其它的线程都只能等待,某一个时刻内,只能有唯一一个线程去访问这些
synchronized 方法,这是因为锁的是当前对象 this,被锁定后,其它的线程都不能进入到当前对象的其它的synchronized 方法。
synchronized 实现同步的基础: Java 中的每一个对象都可以作为锁
对于普通同步方法,锁是当前实例对象。
对于静态同步方法,锁是当前类的 Class 对象。
对于同步方法块,锁是 Synchonized 括号里配置的对象
所有的静态同步方法用的也是同一把锁——类对象本身,类对象本身与实例对象不是一个锁,所以静态同步方法与非静态同步方法之间是不会有竟态条件的。
Callable 接口
callable接口是创建线程的一种方式。
在java中有四种创建线程的方式
- 继承Thread类,重写run方法。
- 实现Runnable接口,在run方法写代码,无返回值。
- 实现Callable接口,在 call方法中写代码,可以有返回值。
- 使用线程池。
Callable接口与Runnable接口的区别:Callable接口提供了一个call() 方法作为线程执行体,call()方法比run()方法功能要强大。call()方法可以有返回值,call()方法可以声明抛出异常。
@FunctionalInterface
public interface Callable<V> {V call() throws Exception;
}
使用 Callable接口必须通过 Future接口来进行管理,不能直接由创建线程。因为 Thread类的构造方法中没有接受 Callable的参数。
Future 接口
当 call() 方法完成时,结果必须存储在主线程已知的对象中,以便主线程可以知道该线程返回的结果。为此,可以使用 Future 对象。
Future 基本上是主线程可以跟踪进度以及其他线程的结果的一种方式。要实现此接口,必须重写 5 种方法,这里列出了重要的方法,如下:
// 如果任务尚未启动,它将停止任务。如果已启动,则仅在 mayInterrupt 为 true时才会中断任务。
public boolean cancel(boolean mayInterrupt)
// 如果任务完成,将立即返回结果,否则将等待任务完成。然后返回结果。
public Object get()
// 如果任务完成,则返回 true,否则返回 false
public boolean isDone()
Future接口的具体实现类 FutureTask类,该类实现 RunnableFuture接口,并方便地将两种功能组合在一起。 可以通过为其构造函数提供 Callable 来创建FutureTask。然后,将 FutureTask 对象提供给 Thread 的构造函数以创建Thread 对象。
在 FutureTask类中的 run 方法中,间接调用了 call方法。
Callable和Future 接口的应用场景
在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些作业交给 Future 对象在后台完成。
当主线程将来需要时,就可以通过 Future 对象获得后台作业的计算结果或者执行状态。
仅在计算完成时才能检索结果;如果计算尚未完成,则阻塞 get 方法(一般在最后的时候获取 get)。只计算一次,下次获得结果的时候,就不用再重新开始计算。
get 方法而获取结果只有在计算完成时获取,否则会一直阻塞直到任务转入完成状态,然后会返回结果或者抛出异常。
示例:
// 自定义类,实现Callable 接口中的call方法。
class CallAble implements Callable<Integer>{@Overridepublic Integer call() {return 1234;}
}
// 使用FutureTask间接调用CallAble实现类
FutureTask<Integer> futureTask = new FutureTask(new CallAble());
new Thread(futureTask).start();
// 在需要的时候,使用 get 方法,获得返回值
Integer integer = futureTask.get();
JUC三大辅助类
JUC 中提供了三种常用的辅助类,通过这些辅助类可以很好的解决线程数量过多时 Lock 锁的频繁操作。
CountDownLatch: 减少计数
CyclicBarrier: 循环栅栏
Semaphore: 信号灯
CountDownLatch
允许一个或多个线程等待直到在其他线程中执行的一组操作完成的同步辅助。 CountDownLatch用给定的计数初始化。 await方法阻塞,直到由于countDown()方法的调用而导致当前计数达到零,之后所有等待线程被释放,并且任何后续的await 调用立即返回。 这是一个一次性的现象 - 计数无法重置。
CountDownLatch中的方法
// 构造方法,初始化计数值
CountDownLatch(int count)
// 导致当前线程等到锁存器计数到零,除非线程是 interrupted 。
void await()
// 使当前线程等待直到锁存器计数到零为止,除非线程为 interrupted或指定的等待时间过去。
boolean await(long timeout, TimeUnit unit)
// 减少锁存器的计数,每次减一,如果计数达到零,释放所有等待的线程。
void countDown()
// 返回当前计数。
long getCount()
// 返回一个标识此锁存器的字符串及其状态。
String toString()
示例:场景: 6 个同学陆续离开教室后值班同学才可以关门。
public class CountDownLatchDemo {public static void main(String[] args) throws Exception{CountDownLatch countDownLatch = new CountDownLatch(6);//创建 6 个线程for (int i = 1; i <= 6; i++) {new Thread(() ->{try{if(Thread.currentThread().getName().equals("同学 6")){Thread.sleep(2000);}System.out.println(Thread.currentThread().getName() + "离开了");//计数器减一,不会阻塞countDownLatch.countDown();}catch (Exception e){e.printStackTrace();}}, "同学" + i).start();}//主线程 await 休息System.out.println("主线程睡觉");//全部离开后自动唤醒主线程countDownLatch.await();System.out.println("全部离开了,现在的计数器为" +countDownLatch.getCount());}
}
CyclicBarrier
允许一组线程全部等待彼此达到共同屏障点的同步辅助。 循环阻塞在涉及固定大小的线程方的程序中很有用,这些线程必须偶尔等待彼此。 屏障被称为循环 ,因为它可以在等待的线程被释放之后重新使用。
CyclicBarrier支持一个可选的Runnable命令,每个屏障点运行一次,在最后一个线程到达之后,但在任何线程释放之前。 在任何一方继续进行之前,此屏障操作对更新共享状态很有用。
CyclicBarrier的方法
//创建一个CyclicBarrier,当给定数量的线程的等待时,它将跳闸,并且屏障跳闸时不执行预定义的动作。
CyclicBarrier(int parties)
// 创建一个CyclicBarrier,当给定数量的线程等待时,它将跳闸,屏障跳闸时执行给定的屏障动作,由最后一个进入屏障的线程执行。
CyclicBarrier(int parties, Runnable barrierAction)
// 等待所有parties已经在这个障碍上调用await。
int await()
// 等待所有parties已经在此屏障上调用await,或指定的等待时间过去。
int await(long timeout, TimeUnit unit)
// 返回目前正在等待障碍的各方的数量。
int getNumberWaiting()
// 返回旅行这个障碍所需的聚会数量。
int getParties()
// 查询这个障碍是否处于破碎状态。
boolean isBroken()
// 将屏障重置为初始状态。
void reset()
每次执行 CyclicBarrier 一次障碍数会加一,如果达到了目标障碍数,才会执行 cyclicBarrier.await()之后的语句。可以将 CyclicBarrier.await() 理解为加 1 操作。如果在 await加一完成之后,达到目标屏障数,将会由最后一个线程执行预定的操作。
示例:场景: 集齐 7 颗龙珠就可以召唤神龙
public class CyclicBarrierDemo {//定义神龙召唤需要的龙珠总数,目标屏障数private final static int NUMBER = 7;public static void main(String[] args) {//定义循环栅栏CyclicBarrier cyclicBarrier = new CyclicBarrier(NUMBER, () ->{System.out.println("集齐" + NUMBER + "颗龙珠,现在召唤神龙!!!!!!!!!");});//定义 7 个线程分别去收集龙珠for (int i = 1; i <= 7; i++) {new Thread(()->{try {System.out.println(Thread.currentThread().getName() + "收集到了");cyclicBarrier.await();}catch (Exception e){e.printStackTrace();}}, "龙珠" + i + "号").start();}}
}
Semaphore
一个计数信号量。 在概念上,信号量维持一组许可证。 在调用acquire()可能会阻塞,直到许可证可用,然后才能使用它。 每个release()释放已经获得的许可证,潜在地释放阻塞获取方。 但是,没有使用实际的许可证对象; Semaphore只保留可用数量的计数,并相应地执行。信号量通常用于限制线程数,而不是访问某些(物理或逻辑)资源。
Semaphore中的方法
// 创建一个Semaphore与给定数量的许可证和非公平公平设置。
Semaphore(int permits)
// 创建一个 Semaphore与给定数量的许可证和给定的公平设置。
Semaphore(int permits, boolean fair)
// 从该信号量获取许可证,阻止直到可用,或线程为interrupted。
void acquire()
// 从该信号量获取给定数量的许可证,阻止直到所有可用,否则线程为 interrupted。
void acquire(int permits)
// 返回此信号量中当前可用的许可数。
int availablePermits()
// 释放许可证,将其返回到信号量。
void release()
// 释放给定数量的许可证,将其返回到信号量。
void release(int permits)
示例:场景: 抢车位, 6 部汽车 3 个停车位
public class SemaphoreDemo {public static void main(String[] args) throws Exception{//定义 3 个停车位Semaphore semaphore = new Semaphore(3);//模拟 6 辆汽车停车for (int i = 1; i <= 6; i++) {new Thread(() ->{try {System.out.println(Thread.currentThread().getName()+"找车位");// 获取许可证,若当前没有可用的许可证,则阻塞semaphore.acquire();System.out.println(Thread.currentThread().getName()+"停车成功!");Thread.sleep(2000);}catch (Exception e){e.printStackTrace();}finally {System.out.println(Thread.currentThread().getName() + "溜了");semaphore.release();}}, "汽车" + i).start();}}
}
BlockingQueue 阻塞队列
Concurrent 包中, BlockingQueue 很好的解决了多线程中,如何高效安全“传输” 数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。
阻塞队列,顾名思义,首先它是一个队列, 通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出。
当队列是空的,从队列中获取元素的操作将会被阻塞,直到其他线程往空的队列插入新的元素。
当队列是满的,从队列中添加元素的操作将会被阻塞,直到其他线程从队列中移除一个或多个元素。
多线程环境中,通过队列可以很容易实现数据共享,比如经典的“生产者” 和“消费者” 模型中,通过队列可以很便利地实现两者之间的数据共享。
当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。
当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒。
阻塞队列的结构
BlockingQueue是一个接口,它的实现类有ArrayBlockingQueue,DelayQueue,LinkedBlockingDeque,LinkedBlockingQueue,LinkedTransferQueue,PriorityBlockingQueue,SynchronousQueue。
BlockingQueue接口中的方法
// 将元素插入队列中,如果有容量,成功后返回true,如果当前没有容量,则抛出IllegalStateException。
boolean add(E e)
// 如果此队列包含指定的元素,则返回 true 。
boolean contains(Object o)
// 从该队列中删除所有可用的元素,并将它们添加到给定的集合中。
int drainTo(Collection<? super E> c)
// 最多从该队列中删除给定数量的可用元素,并将它们添加到给定的集合中。
int drainTo(Collection<? super E> c, int maxElements)
// 将元素插入队列中,如果有容量,成功时返回true,如果当前没有容量,则返回false。
boolean offer(E e)
// 将元素插入队列中,指定的等待时间,才能使空间变得可用。
boolean offer(E e, long timeout, TimeUnit unit)
// 检索并删除此队列的头,等待指定的等待时间使元素变为可用。
E poll(long timeout, TimeUnit unit)
// 将指定的元素插入到此队列中,等待空格可用。
void put(E e)
// 返回该队列最好可以接受而不会阻塞,或附加元素的数量Integer.MAX_VALUE如果没有固有的限制。
int remainingCapacity()
// 从该队列中删除指定元素的单个实例(如果存在)。
boolean remove(Object o)
// 检索并删除此队列的头,如有必要,等待元素可用。
E take()
ArrayBlockingQueue
基于数组的阻塞队列实现,在 ArrayBlockingQueue 内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外, ArrayBlockingQueue 内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。
ArrayBlockingQueue 在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行。
由数组结构构成的有界阻塞队列。
LinkedBlockingQueue
基于链表的阻塞队列,同 ArrayListBlockingQueue 类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue 可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。
LinkedBlockingQueue 能够高效的处理并发数据,因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
由链表结构组成的有界阻塞队列(大小默认为 integer.MAX_VALUE)
DelayQueue
DelayQueue 中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。 DelayQueue 是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
使用优先级队列实现的延迟无界阻塞队列。
PriorityBlockingQueue
基于优先级的阻塞队列(优先级的判断通过构造函数传入的 Compator 对象来决定),但需要注意的是 PriorityBlockingQueue 并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。
因此使用的时候要特别注意, 生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现 PriorityBlockingQueue 时,内部控制线程同步的锁采用的是公平锁。
支持优先级排序的无界阻塞队列。
BlockingQueue 核心方法
线程池
线程池简介:线程池(英语: thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。
线程池的优势: 线程池做的工作只要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。
主要特点为:
- 降低资源消耗: 通过重复利用已创建的线程降低线程创建和销毁造成的销耗。
- 提高响应速度: 当任务到达时,任务可以不需要等待线程创建就能立即执行。
- 提高线程的可管理性: 线程是稀缺资源,如果无限制的创建,不仅会销耗系统资
源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
线程池的实现:
Java 中的线程池是通过 Executor 框架实现的,该框架中用到了 Executor, Executors,
ExecutorService, ThreadPoolExecutor 这几个类。
使用 Executors 工具类来实现线程的创建
线程池的分类
线程池的返回值ExecutorService简介:ExecutorService是Java提供的用于管理线程池的接口。该接口的两个作用:控制线程数量和重用线程。
Executors.newFixedThreadPool(int n) :创建一个线程池,线程池中有n个线程。
Executors.newSingleThreadExecutor():创建一个线程池,线程池中只有一个线程。
Executors.newCacheThreadPool():创建一个线程池,线程池中线程数可以根据情况自动扩容。
自定义线程池ThreadPoolExecutor():自定义设置参数,创建线程池。
Executors.newFixedThreadPool(int n)的使用
作用:创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。在任意点,在大多数线程会处于处理任务的活动状态。如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务。在某个线程被显式地关闭之前,池中的线程将一直存在。
特征:
- 线程池中的线程处于一定的量,可以很好的控制线程的并发量
- 线程可以重复被使用,在显示关闭之前,都将一直存在
- 超出一定量的线程被提交时候需在队列中等待
代码示例:
public static void main(String[] args) {// 创建固定的5个线程ExecutorService threadPool = Executors.newFixedThreadPool(5); try {for(int i = 1;i <= 10; i++) {//使用线程池中的线程threadPool.execute(()->{System.out.println(Thread.currentThread().getName()+" 办理业务");});}}catch (Exception e) {e.printStackTrace();}finally {//关闭线程池threadPool.shutdown();}
}
Executors.newSingleThreadExecutor()的使用
作用:创建一个使用单个任务线程的 Executor,以无界队列方式来运行该线程。(注意,如果因为在关闭前的执行期间出现失败而终止了此单个线程,那么如果需要,一个新线程将代替它执行后续的任务)。可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。与其他等效的
newFixedThreadPool 不同,可保证无需重新配置此方法所返回的执行程序即可使用其他的线程。
特征: 线程池中最多执行 1 个线程,之后提交的线程活动将会排在队列中以此执行。
public static void main(String[] args) {// 创建线程池,线程池中只有一个活动的线程。ExecutorService threadPool = Executors.newSingleThreadExecutor(); try {for(int i = 1;i <= 10; i++) {//使用线程池中的线程threadPool.execute(()->{System.out.println(Thread.currentThread().getName()+" 办理业务");});}}catch (Exception e) {e.printStackTrace();}finally {//关闭线程池threadPool.shutdown();}
}
Executors.newCacheThreadPool()的使用
作用:创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
特点:
- 线程池中数量没有固定,可达到最大值(Interger. MAX_VALUE)
- 线程池中的线程可进行缓存重复利用和回收(回收默认时间为 1 分钟)
- 当线程池中,没有可用线程,会重新创建一个线程
public static void main(String[] args) {// 创建线程池,线程池中只有一个活动的线程。ExecutorService threadPool = Executors.newCacheThreadPool(); try {for(int i = 1;i <= 10; i++) {//使用线程池中的线程threadPool.execute(()->{System.out.println(Thread.currentThread().getName()+" 办理业务");});}}catch (Exception e) {e.printStackTrace();}finally {//关闭线程池threadPool.shutdown();}
}
自定义线程池
分析源码,在创建上面三个线程池时,都是使用ThreadPoolExecutor这个方法进行创建的,只不过在创建时,参数不同而已。所以在我们创建自定义线程池时,只要根据需求设置对应的参数就好了。
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}
先来分析 ThreadPoolExecutor 方法的参数的含义。
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);}
corePoolSize:线程池的核心线程数(即活动线程的数量)。
maximumPoolSize:线程池中的最大线程数。
keepAliveTime:空闲线程的存活时间。
unit:存活时间的单位
workQueue:存放提交但是来不及执行的队列
其中在方法内部还省略了两个参数,都使用的默认值
threadFactory:创建线程的工厂类,默认值 Executors.defaultThreadFactory()
handler:等待队列满后的拒绝策略,默认值 defaultHandler
拒绝策略
线程池中,有三个重要的参数,决定影响了拒绝策略: corePoolSize - 核心线程数,也即最小的线程数。 workQueue - 阻塞队列 。 maximumPoolSize -最大线程数,当提交任务数大于 corePoolSize 的时候,会优先将任务放到 workQueue 阻塞队列中。当阻塞队列饱和后,会扩充线程池中线程数,直到达到最大线程数,此时,再多余的任务,则会触发线程池的拒绝策略了。
当提交的任务数大于workQueue.size() + maximumPoolSize,就会触发线程池的拒绝策略。
四种不同的拒绝策略:
AbortPolicy: 丢弃任务,并抛出拒绝执行 RejectedExecutionException 异常信息。线程池默认的拒绝策略。必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。
CallerRunsPolicy: 当触发拒绝策略,只要线程池没有关闭的话,则使用调用线程直接运行任务。一般并发比较小,性能要求不高,不允许失败。但是,由于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大。
DiscardOldestPolicy: 当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞队列 workQueue 中最老的一个任务,并将新任务加入。
DiscardPolicy: 直接丢弃,其他啥都没有
自定义实现线程池
只需要根据实际需求设置好参数即可
ExecutorService threadPool = new ThreadPoolExecutor(2,5,2L,TimeUnit.SECONDS,new ArrayBlockingQueue<>(3),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy()
);
线程池工作流程
在创建了线程池后,线程池中的线程数为零,只有在真正的使用线程池的时候,才进行创建线程。
当调用 execute()方法添加一个请求任务时,线程池会做出如下判断:
- 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务。
- 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列。
- 如果这个时候队列满了且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务。
- 如果队列满了且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会启动拒绝策略来执行。
当一个线程完成任务时,它会从队列中取下一个任务来执行。
当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:
- 如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。
- 所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。
注意事项
项目中创建多线程时,使用常见的三种线程池创建方式,单一、可变、定长都有一定问题,原因是 FixedThreadPool 和 SingleThreadExecutor 底层都是用LinkedBlockingQueue 实现的,这个队列最大长度为 Integer.MAX_VALUE,容易导致内存溢出。所以一般实际生产自己通过ThreadPoolExecutor 的 7 个参数,自定义线程池。
Fork/Join
Fork/Join 它可以将一个大的任务拆分成多个子任务进行并行处理,最后将子
任务结果合并成最后的计算结果,并进行输出。
Fork/Join 框架要完成两件事情:
Fork:把一个复杂任务进行分拆,大事化小。
Join:把分拆任务的结果进行合并。
任务分割:首先 Fork/Join 框架需要把大的任务分割成足够小的子任务,如果子任务比较大的话还要对子任务进行继续分割。
执行任务并合并结果:分割的子任务分别放到双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都放在另外一个队列里,启动一个线程从队列里取数据,然后合并这些数据。
使用的类:
ForkJoinTask:我们要使用 Fork/Join 框架,首先需要创建一个 ForkJoin 任务。该类提供了在任务中执行 fork 和 join 的机制。通常情况下我们不需要直接集成 ForkJoinTask 类,只需要继承它的子类。 Fork/Join 框架提供了两个子类:
- RecursiveAction:用于没有返回结果的任务
- RecursiveTask:用于有返回结果的任务
ForkJoinPool:ForkJoinTask 需要通过 ForkJoinPool 来执行。
Fork/Join 框架的实现原理
ForkJoinPool 由 ForkJoinTask 数组和 ForkJoinWorkerThread 数组组成,ForkJoinTask 数组负责将存放以及将程序提交给 ForkJoinPool,而ForkJoinWorkerThread 负责执行这些任务。
Fork 方法的实现原理:
当我们调用 ForkJoinTask 的 fork 方法时,程序会把任务放在ForkJoinWorkerThread 的 pushTask 的 workQueue 中,异步地执行这个任务,然后立即返回结果。
public final ForkJoinTask<V> fork() {Thread t;if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)((ForkJoinWorkerThread)t).workQueue.push(this);elseForkJoinPool.common.externalPush(this);return this;}
}
pushTask 方法把当前任务存放在 ForkJoinTask 数组队列里。然后再调用ForkJoinPool 的 signalWork()方法唤醒或创建一个工作线程来执行任务。
final void push(ForkJoinTask<?> task) {ForkJoinTask<?>[] a; ForkJoinPool p;int b = base, s = top, n;if ((a = array) != null) { // ignore if queue removedint m = a.length - 1; // fenced write for task visibilityU.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);U.putOrderedInt(this, QTOP, s + 1);if ((n = s - b) <= 1) {if ((p = pool) != null)p.signalWork(p.workQueues, this);}else if (n >= m)growArray();}
}
Join方法的实现原理:
它首先调用 doJoin 方法,通过 doJoin()方法得到当前任务的状态来判断返回什么结果,任务状态有 4 种:已完成(NORMAL)、被取消(CANCELLED)、信号(SIGNAL)和出现异常(EXCEPTIONAL)。
如果任务状态是已完成,则直接返回任务结果。
如果任务状态是被取消,则直接抛出 CancellationException
如果任务状态是抛出异常,则直接抛出对应的异常
public final V join() {int s;if ((s = doJoin() & DONE_MASK) != NORMAL)reportException(s);return getRawResult();
}
doJoin 方法执行流程:
- 首先通过查看任务的状态,看任务是否已经执行完成,如果执行完成,则直接返回任务状态;
- 如果没有执行完,则从任务数组里取出任务并执行。
- 如果任务顺利执行完成,则设置任务状态为 NORMAL,如果出现异常,则记录异常,并将任务状态设置为 EXCEPTIONAL。
private int doJoin() {int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;return (s = status) < 0 ? s :((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?(w = (wt = (ForkJoinWorkerThread)t).workQueue).tryUnpush(this) && (s = doExec()) < 0 ? s :wt.pool.awaitJoin(w, this, 0L) :externalAwaitDone();
}
案例
场景: 生成一个计算任务,计算 1+2+3…+100,每10个数切割成一个子任务。
class MyTask extends RecursiveTask<Integer> {//拆分差值不能超过10,计算10以内运算private static final Integer VALUE = 10;private int begin ;//拆分开始值private int end;//拆分结束值private int result ; //返回结果//创建有参数构造public MyTask(int begin,int end) {this.begin = begin;this.end = end;}//拆分和合并过程@Overrideprotected Integer compute() {//判断相加两个数值是否大于10if((end-begin)<=VALUE) {//相加操作for (int i = begin; i <= end; i++) {result = result+i;}} else {//进一步拆分//获取中间值int middle = (begin+end)/2;//拆分左边MyTask task01 = new MyTask(begin,middle);//拆分右边MyTask task02 = new MyTask(middle+1,end);//调用方法拆分task01.fork();task02.fork();//合并结果result = task01.join()+task02.join();}return result;}
}
public class ForkJoinDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {//创建MyTask对象MyTask myTask = new MyTask(0,100);//创建分支合并池对象ForkJoinPool forkJoinPool = new ForkJoinPool();ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myTask);//获取最终合并之后结果Integer result = forkJoinTask.get();System.out.println(result);//关闭池对象forkJoinPool.shutdown();}
}
CompletableFuture
CompletableFuture简介
CompletableFuture 在 Java 里面被用于异步编程,异步通常意味着非阻塞,可以使得我们的任务单独运行在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息。
CompletableFuture 实现了 Future, CompletionStage 接口,实现了 Future接口就可以兼容现在有线程池框架,而 CompletionStage 接口才是异步编程的接口抽象,里面定义多种异步方法,通过这两者集合,从而打造出了强大的CompletableFuture 类。
Future 与 CompletableFuture
Futrue 在 Java 里面,通常用来表示一个异步任务的引用,比如我们将任务提交到线程池里面,然后我们会得到一个 Futrue,在 Future 里面有 isDone 方法来判断任务是否处理结束,还有 get 方法可以一直阻塞直到任务结束然后获取结果,但整体来说这种方式,还是同步的,因为需要客户端不断阻塞等待或者不断轮询才能知道任务是否完成。
Future 的主要缺点如下:
(1) 不支持手动完成
我提交了一个任务,但是执行太慢了,我通过其他路径已经获取到了任务结果,现在没法把这个任务结果通知到正在执行的线程,所以必须主动取消或者一直等待它执行完成。
(2) 不支持进一步的非阻塞调用
通过 Future 的 get 方法会一直阻塞到任务完成,但是想在获取任务之后执行额外的任务,因为 Future 不支持回调函数,所以无法实现这个功能。
(3) 不支持链式调用
对于 Future 的执行结果,我们想继续传到下一个 Future 处理使用,从而形成一个链式的 pipline 调用,这在 Future 中是没法实现的。
(4) 不支持多个Future合并
比如我们有 10 个 Future 并行执行,我们想在所有的 Future 运行完毕之后,执行某些函数,是没法通过 Future 实现的。
(5) 不支持异常处理
Future 的 API 没有任何的异常处理的 api,所以在异步运行时,如果出了问题是不好定位的。
CompletableFuture类中的方
// 等待这个未来完成的必要,然后返回结果。
T get()
// 如果有必要等待这个未来完成的给定时间,然后返回其结果(如果有的话)。
T get(long timeout, TimeUnit unit)
// 返回一个新的CompletableFuture,它在运行给定操作后由运行在 ForkJoinPool.commonPool()中的任务 异步完成。
static CompletableFuture<Void> runAsync(Runnable runnable)
// 返回一个新的CompletableFuture,它在运行给定操作之后由在给定执行程序中运行的任务异步完成。
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
// 返回一个新的CompletableFuture,它通过在ForkJoinPool.commonPool()中运行的任务与通过调用给定的供应商获得的值异步完成。
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
// 返回一个新的CompletableFuture,由给定执行器中运行的任务异步完成,并通过调用给定的供应商获得的值。
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
Java之JUC并发编程相关推荐
- 厚积薄发打卡Day26:狂神说Java之JUC并发编程<代码+笔记>(上)
前言: 学习视频来源:[狂神说Java]JUC并发编程最新版通俗易懂 一个十分优秀且励志的技术大牛+Java讲师,十分推荐他的频道:遇见狂神说
- 基于《狂神说Java》JUC并发编程--学习笔记
前言: 本笔记仅做学习与复习使用,不存在刻意抄袭. -------------------------------------------------------------------------- ...
- Java JUC并发编程详解
Java JUC并发编程详解 1. JUC概述 1.1 JUC简介 1.2 进程与线程 1.2 并发与并行 1.3 用户线程和守护线程 2. Lock接口 2.1 Synchronized 2.2 什 ...
- JUC并发编程(java util concurrent)(哔站 狂神说java juc并发编程 摘录笔记)
JUC并发编程(java util concurrent) 1.什么是JUC JUC并不是一个很神秘的东西(就是 java.util 工具包.包.分类) 业务:普通的线程代码 Thread Runna ...
- java书籍_还搞不定Java多线程和并发编程面试题?你可能需要这一份书单!
点击蓝色"程序员书单"关注我哟 加个"星标",每天带你读好书! 在介绍本书单之前,我想先问一下各位读者,你们之前对于Java并发编程的了解有多少呢.经过了1 ...
- JUC并发编程中的集合不安全问题源码解析
JUC并发编程四:集合不安全(Java) 1.List不安全! 代码示例: package unsafe;import java.util.*; import java.util.concurrent ...
- ❤️《JUC并发编程从入门到高级》(建议收藏)❤️
JUC并发编程 1.什么是JUC JUC的意思就是java并发编程工具包,与JUC相关的有三个包:java.util.concurrent.java.util.concurrent.atomic.ja ...
- JUC并发编程第十四篇,StampedLock(邮戳锁)为什么比ReentrantReadWriteLock(读写锁)更快!
JUC并发编程第十四篇,StampedLock(邮戳锁)为什么比ReentrantReadWriteLock(读写锁)更快! 一.ReentrantReadWriteLock(读写锁) 1.读写锁存在 ...
- 多线程进阶=》JUC并发编程02
在JUC并发编程01中说到了,什么是JUC.线程和进程.Lock锁.生产者和消费者问题.8锁现象.集合类不安全.Callable(简单).常用辅助类.读写锁 https://blog.csdn.net ...
最新文章
- 世界最优秀的分布式文件系统架构演进之路
- VC++实现获取网络时间
- springcloud 返回json
- html中刷新按钮的代码,常见的按钮类型 点击button刷新的几种常用代码
- 【个人重点】开发中应该重视的几点
- 设计模式(一):从三类模式六种原则看设计模式
- PyTorch基础(六)迁移学习
- Unity2D 小游戏之 RocketMouse
- oh my Zsh使用手册
- cmd 查看端口占用情况,及对应进程,杀进程
- Linux基础教程(第二版)课后答案自主整理
- ab压力 failed_Apache ab压力测试时出现大量的错误原因分析
- java 手机推荐_Java一样也智能 中低端手机上网助手推荐
- 计算机锁屏之后QQ音乐停止播放了,MAC电脑如何在息屏状态下让QQ音乐能继续播放音乐...
- 品优购项目笔记(十四):微信支付
- uni-app接入友盟
- Redis集群Hash槽分配异常 CLUSTERDOWN Hash slot not served的解决方式
- 什么是空头陷阱?(全网最全面的分析)?
- js相关面试题20道
- 亲测可用:两个在线的测试数据生成工具