紧接上一篇Java 多线程基础(上),
本文目录结构如下:
8、使用ReentrantLock
9、使用Condition
10、使用ReadWriteLock
11、使用StampedLock
12、使用Concurrent集合
13、使用Atomic
14、使用线程池
15、使用Future
16、使用CompletableFuture
17、使用ForkJoin
18、使用ThreadLocal

8 使用ReentrantLock

我们知道Java语言直接提供了synchronized关键字用于加锁,但这种锁一是很重,二是获取时必须一直等待,没有额外的尝试机制。java.util.concurrent.locks包提供的ReentrantLock用于替代synchronized加锁,我们来看一下传统的synchronized代码:

public class Counter {private int count;public void add(int n) {synchronized(this) {count += n;}}
}

如果用ReentrantLock替代,可以把代码改造为:

public class Counter {private final Lock lock = new ReentrantLock();private int count;public void add(int n) {lock.lock();try {count += n;} finally {lock.unlock();}}
}

因为synchronized是Java语言层面提供的语法,所以我们不需要考虑异常,而ReentrantLock是Java代码实现的锁,我们就必须先获取锁,然后在finally中正确释放锁。
ReentrantLock是可重入锁,它和synchronized一样,一个线程可以多次获取同一个锁。和synchronized不同的是,ReentrantLock可以尝试获取锁:

if (lock.tryLock(1, TimeUnit.SECONDS)) {try {...} finally {lock.unlock();}
}

上述代码在尝试获取锁的时候,最多等待1秒。如果1秒后仍未获取到锁,tryLock()返回false,程序就可以做一些额外处理,而不是无限等待下去。所以,使用ReentrantLock比直接使用synchronized更安全,线程在tryLock()失败的时候不会导致死锁。
小结:

  • ReentrantLock可以替代synchronized进行同步;

  • ReentrantLock获取锁更安全;

  • 必须先获取到锁,再进入try {…}代码块,最后使用finally保证释放锁;

  • 可以使用tryLock()尝试获取锁。

9 使用Condition

使用ReentrantLock比直接使用synchronized更安全,可以替代synchronized进行线程同步。但是,synchronized可以配合wait和notify实现线程在条件不满足时等待,条件满足时唤醒,用ReentrantLock我们怎么编写wait和notify的功能呢?答案是使用Condition对象来实现wait和notify的功能。我们仍然以TaskQueue为例,把前面用synchronized实现的功能通过ReentrantLock和Condition来实现:

//synchronized 实现方式
class TaskQueue {private Queue<String> queue = new LinkedList<>();public synchronized void addTask(String s) {this.queue.add(s);this.notifyAll();}public synchronized String getTask() throws InterruptedException {while (queue.isEmpty()) {this.wait();}return queue.remove();}
}//ReentrantLock实现方式
class TaskQueueByReentrantLock{/*使用Condition时,引用的Condition对象必须从Lock实例的newCondition()返回,这样才能获得一个绑定了Lock实例的Condition实例。*/private final Lock lock = new ReentrantLock();private final Condition condition = lock.newCondition();private Queue<String> q = new LinkedList<>();public void addTack(String s){lock.lock();try {q.add(s);condition.signalAll();} finally {lock.unlock();}}public String getTask() throws InterruptedException{lock.lock();try {while(q.isEmpty()){//唤醒的线程从await()返回后需要重新获得锁才能进行下一行操作condition.await();//和tryLock()类似,await()可以在等待指定时间后,如果还没有被其他线程通过signal()或signalAll()唤醒,可以自己醒来:if(condition.await(1, TimeUnit.SECONDS)){// // 被其他线程唤醒}else {// 指定时间内没有被其他线程唤醒}}return q.remove();} finally {lock.unlock();}}
}

可见,使用Condition时,引用的Condition对象必须从Lock实例的newCondition()返回,这样才能获得一个绑定了Lock实例的Condition实例。
Condition提供的await()、signal()、signalAll()原理和synchronized锁对象的wait()、notify()、notifyAll()是一致的,并且其行为也是一样的:

  • await()会释放当前锁,进入等待状态;

  • signal()会唤醒某个等待线程;

  • signalAll()会唤醒所有等待线程;

  • 唤醒线程从await()返回后需要重新获得锁。

此外,和tryLock()类似,await()可以在等待指定时间后,如果还没有被其他线程通过signal()或signalAll()唤醒,可以自己醒来:

if (condition.await(1, TimeUnit.SECOND)) {// 被其他线程唤醒
} else {// 指定时间内没有被其他线程唤醒
}

小结:

  • Condition可以替代wait和notify;

  • Condition对象必须从Lock对象获取。

10 使用ReadWriteLock

前面讲到的ReentrantLock保证了只有一个线程可以执行临界区代码:

public class Counter {private final Lock lock = new ReentrantLock();private int[] counts = new int[10];public void inc(int index) {lock.lock();try {counts[index] += 1;} finally {lock.unlock();}}public int[] get() {lock.lock();try {return Arrays.copyOf(counts, counts.length);} finally {lock.unlock();}}
}

但是有些时候,这种保护有点过头。因为我们发现,任何时刻,只允许一个线程修改,也就是调用inc()方法是必须获取锁,但是,get()方法只读取数据,不修改数据,它实际上允许多个线程同时调用。实际上我们想要的是:允许多个线程同时读,但只要有一个线程在写,其他线程就必须等待:

|

允许
不允许

使用ReadWriteLock可以解决这个问题,它保证:

  • 只允许一个线程写入(其他线程既不能写入也不能读取);

  • 没有写入时,多个线程允许同时读(提高性能)。

用ReadWriteLock实现这个功能十分容易。我们需要创建一个ReadWriteLock实例,然后分别获取读锁和写锁:

public class CounterReadWrite {private final ReadWriteLock rwlock = new ReentrantReadWriteLock();private final Lock rlock = rwlock.readLock();private final Lock wlock = rwlock.writeLock();private int[] counts = new int[10];public void inc(int index){wlock.lock();try {counts[index] += 1;} finally {wlock.unlock();}}public int[] getCounts(){rlock.lock();try {return Arrays.copyOf(counts,counts.length);} finally {rlock.unlock();}}
}/*
使用ReadWriteLock可以提高读取效率:ReadWriteLock只允许一个线程写入;ReadWriteLock允许多个线程在没有写入时同时读取;ReadWriteLock适合读多写少的场景。*/

把读写操作分别用读锁和写锁来加锁,在读取时,多个线程可以同时获得读锁,这样就大大提高了并发读的执行效率。使用ReadWriteLock时,适用条件是同一个数据,有大量线程读取,但仅有少数线程修改。
例如,一个论坛的帖子,回复可以看做写入操作,它是不频繁的,但是,浏览可以看做读取操作,是非常频繁的,这种情况就可以使用ReadWriteLock。
小结:
使用ReadWriteLock可以提高读取效率:

  • ReadWriteLock只允许一个线程写入;

  • ReadWriteLock允许多个线程在没有写入时同时读取;

  • ReadWriteLock适合读多写少的场景。

11使用StampedLock

前面介绍的ReadWriteLock可以解决多线程同时读,但只有一个线程能写的问题。
如果我们深入分析ReadWriteLock,会发现它有个潜在的问题:如果有线程正在读,写线程需要等待读线程释放锁后才能获取写锁,即读的过程中不允许写,这是一种悲观的读锁。
要进一步提升并发执行效率,Java 8引入了新的读写锁:StampedLock。StampedLock和ReadWriteLock相比,改进之处在于:读的过程中也允许获取写锁后写入!这样一来,我们读的数据就可能不一致,所以,需要一点额外的代码来判断读的过程中是否有写入,这种读锁是一种乐观锁。乐观锁的意思就是乐观地估计读的过程中大概率不会有写入,因此被称为乐观锁。反过来,悲观锁则是读的过程中拒绝有写入,也就是写入必须等待。显然乐观锁的并发效率更高,但一旦有小概率的写入导致读取的数据不一致,需要能检测出来,再读一遍就行。例子如下:

public class TestStampedLock {private final StampedLock stampedLock = new StampedLock();private double x,y;public void move(double deltaX, double deltaY) {long stamp = stampedLock.writeLock(); // 获取写锁try {x += deltaX;y += deltaY;} finally {stampedLock.unlockWrite(stamp); // 释放写锁}}public double distanceFromOrigin() {long stamp = stampedLock.tryOptimisticRead(); // 获得一个乐观读锁Map<Integer,String> map = new ConcurrentHashMap<>();
// Map<Integer,String> unsafe = new HashMap<>();
// Map<Integer,String> safe = Collections.synchronizedMap(unsafe);// 注意下面两行代码不是原子操作// 假设x,y = (100,200)double currentX = x;// 此处已读取到x=100,但x,y可能被写线程修改为(300,400)double currentY = y;// 此处已读取到y,如果没有写入,读取是正确的(100,200)// 如果有写入,读取是错误的(100,400)if (!stampedLock.validate(stamp)) { // 检查乐观读锁后是否有其他写锁发生stamp = stampedLock.readLock(); // 获取一个悲观读锁try {currentX = x;currentY = y;} finally {stampedLock.unlockRead(stamp); // 释放悲观读锁}}return Math.sqrt(currentX * currentX + currentY * currentY);}
}
/*
StampedLock提供了乐观读锁,可取代ReadWriteLock以进一步提升并发性能;StampedLock是不可重入锁。*/

和ReadWriteLock相比,写入的加锁是完全一样的,不同的是读取。注意到首先我们通过tryOptimisticRead()获取一个乐观读锁,并返回版本号。接着进行读取,读取完成后,我们通过validate()去验证版本号,如果在读取过程中没有写入,版本号不变,验证成功,我们就可以放心地继续后续操作。如果在读取过程中有写入,版本号会发生变化,验证将失败。在失败的时候,我们再通过获取悲观读锁再次读取。由于写入的概率不高,程序在绝大部分情况下可以通过乐观读锁获取数据,极少数情况下使用悲观读锁获取数据。可见,StampedLock把读锁细分为乐观读和悲观读,能进一步提升并发效率。但这也是有代价的:一是代码更加复杂,二是StampedLock是不可重入锁,不能在一个线程中反复获取同一个锁。StampedLock还提供了更复杂的将悲观读锁升级为写锁的功能,它主要使用在if-then-update的场景:即先读,如果读的数据满足条件,就返回,如果读的数据不满足条件,再尝试写。
小结:

  • StampedLock提供了乐观读锁,可取代ReadWriteLock以进一步提升并发性能;

  • StampedLock是不可重入锁。

12使用Concurrent集合

我们在前面已经通过ReentrantLock和Condition实现了一个BlockingQueue:

public class TaskQueue {private final Lock lock = new ReentrantLock();private final Condition condition = lock.newCondition();private Queue<String> queue = new LinkedList<>();public void addTask(String s) {lock.lock();try {queue.add(s);condition.signalAll();} finally {lock.unlock();}}public String getTask() {lock.lock();try {while (queue.isEmpty()) {condition.await();}return queue.remove();} finally {lock.unlock();}}
}

BlockingQueue的意思就是说,当一个线程调用这个TaskQueue的getTask()方法时,该方法内部可能会让线程变成等待状态,直到队列条件满足不为空,线程被唤醒后,getTask()方法才会返回。
因为BlockingQueue非常有用,所以我们不必自己编写,可以直接使用Java标准库的java.util.concurrent包提供的线程安全的集合:ArrayBlockingQueue。
除了BlockingQueue外,针对List、Map、Set、Deque等,java.util.concurrent包也提供了对应的并发集合类。我们归纳一下:

interface non-thread-safe thread-safe
List ArrayList CopyOnWriteArrayList
Map HashMap ConcurrentHashMap
Set HashSet / TreeSet CopyOnWriteArraySet
Queue ArrayDeque / LinkedList ArrayBlockingQueue / LinkedBlockingQueue
Deque ArrayDeque / LinkedList LinkedBlockingDeque

使用这些并发集合与使用非线程安全的集合类完全相同。我们以ConcurrentHashMap为例:

Map<String, String> map = new ConcurrentHashMap<>();
// 在不同的线程读写:
map.put("A", "1");
map.put("B", "2");
map.get("A", "1");

因为所有的同步和加锁的逻辑都在集合内部实现,对外部调用者来说,只需要正常按接口引用,其他代码和原来的非线程安全代码完全一样。即当我们需要多线程访问时,把:

Map<String, String> map = new HashMap<>();
//改为:
Map<String, String> map = new ConcurrentHashMap<>();

java.util.Collections工具类还提供了一个旧的线程安全集合转换器,可以这么用:

Map unsafeMap = new HashMap();
Map threadSafeMap = Collections.synchronizedMap(unsafeMap);

但是它实际上是用一个包装类包装了非线程安全的Map,然后对所有读写方法都用synchronized加锁,这样获得的线程安全集合的性能比java.util.concurrent集合要低很多,所以不推荐使用。
小结:

  • 使用java.util.concurrent包提供的线程安全的并发集合可以大大简化多线程编程:

  • 多线程同时读写并发集合是安全的;

  • 尽量使用Java标准库提供的并发集合,避免自己编写同步代码。

13使用Atomic

Java的java.util.concurrent包除了提供底层锁、并发集合外,还提供了一组原子操作的封装类,它们位于java.util.concurrent.atomic包。我们以AtomicInteger为例,它提供的主要操作有:

  • 增加值并返回新值:int addAndGet(int delta)

  • 加1后返回新值:int incrementAndGet()

  • 获取当前值:int get()

  • 用CAS方式设置:int compareAndSet(int expect, int update)

Atomic类是通过无锁(lock-free)的方式实现的线程安全(thread-safe)访问。它的主要原理是利用了CAS:Compare and Set。如果我们自己通过CAS编写incrementAndGet(),它大概长这样:

public int incrementAndGet(AtomicInteger var) {int prev, next;do {prev = var.get();next = prev + 1;} while ( ! var.compareAndSet(prev, next));return next;
}

CAS是指,在这个操作中,如果AtomicInteger的当前值是prev,那么就更新为next,返回true。如果AtomicInteger的当前值不是prev,就什么也不干,返回false。通过CAS操作并配合do … while循环,即使其他线程修改了AtomicInteger的值,最终的结果也是正确的。我们利用AtomicLong可以编写一个多线程安全的全局唯一ID生成器:

class IdGenerator {AtomicLong var = new AtomicLong(0);public long getNextId() {return var.incrementAndGet();}
}

通常情况下,我们并不需要直接用do … while循环调用compareAndSet实现复杂的并发操作,而是用incrementAndGet()这样的封装好的方法,因此,使用起来非常简单。在高度竞争的情况下,还可以使用Java 8提供的LongAdder和LongAccumulator。
小结:

  • 使用java.util.concurrent.atomic提供的原子操作可以简化多线程编程:

  • 原子操作实现了无锁的线程安全;

  • 适用于计数器,累加器等。

14使用线程池

Java语言虽然内置了多线程支持,启动一个新线程非常方便,但是,创建线程需要操作系统资源(线程资源,栈空间等),频繁创建和销毁大量线程需要消耗大量时间。如果可以复用一组线程:那么我们就可以把很多小任务让一组线程来执行,而不是一个任务对应一个新线程。这种能接收大量小任务并进行分发处理的就是线程池。简单地说,线程池内部维护了若干个线程,没有任务的时候,这些线程都处于等待状态。如果有新任务,就分配一个空闲线程执行。如果所有线程都处于忙碌状态,新任务要么放入队列等待,要么增加一个新线程进行处理。Java标准库提供了ExecutorService接口表示线程池,它的典型用法如下:

// 创建固定大小的线程池:
ExecutorService executor = Executors.newFixedThreadPool(3);
// 提交任务:
executor.submit(task1);
executor.submit(task2);
executor.submit(task3);
executor.submit(task4);
executor.submit(task5);

因为ExecutorService只是接口,Java标准库提供的几个常用实现类有:

  • FixedThreadPool:线程数固定的线程池;

  • CachedThreadPool:线程数根据任务动态调整的线程池;

  • SingleThreadExecutor:仅单线程执行的线程池。

创建这些线程池的方法都被封装到Executors这个类中。我们以FixedThreadPool为例,看看线程池的执行逻辑:

public class TestThreadPool2 implements Runnable {private String name;private static volatile AtomicInteger counter = new AtomicInteger(0);public void add(){counter.incrementAndGet();}public TestThreadPool2(String name){this.name = name;}@Overridepublic void run() {System.out.println("start name" + name);
// add();try {Thread.sleep(100);} catch (InterruptedException e) {}System.out.println("end name" + name);}public static void main(String[] args) throws InterruptedException{ExecutorService es = Executors.newFixedThreadPool(4);
// ExecutorService es = Executors.newCachedThreadPool();for(int i = 0;i<7;i++){es.submit(new TestThreadPool2(" " + i));}Thread.sleep(1000);
//关闭线程池es.shutdown();}}

我们观察执行结果,一次性放入6个任务,由于线程池只有固定的4个线程,因此,前4个任务会同时执行,等到有线程空闲后,才会执行后面的两个任务。线程池在程序结束的时候要关闭。使用shutdown()方法关闭线程池的时候,它会等待正在执行的任务先完成,然后再关闭。shutdownNow()会立刻停止正在执行的任务,awaitTermination()则会等待指定的时间让线程池关闭。如果我们把线程池改为CachedThreadPool,由于这个线程池的实现会根据任务数量动态调整线程池的大小,所以6个任务可一次性全部同时执行。小结:JDK提供了ExecutorService实现了线程池功能:

  • 线程池内部维护一组线程,可以高效执行大量小任务;

  • Executors提供了静态方法创建不同类型的ExecutorService;

  • 必须调用shutdown()关闭ExecutorService;

  • ScheduledThreadPool可以定期调度多个任务。

15使用Future

在执行多个任务的时候,使用Java标准库提供的线程池是非常方便的。我们提交的任务只需要实现Runnable接口,就可以让线程池去执行:

class Task implements Runnable {public String result;public void run() {this.result = longTimeCalculation();}
}

Runnable接口有个问题,它的方法没有返回值。如果任务需要一个返回结果,那么只能保存到变量,还要提供额外的方法读取,非常不便。所以,Java标准库还提供了一个Callable接口,和Runnable接口比,它多了一个返回值:

class Task implements Callable<String> {public String call() throws Exception {return longTimeCalculation();}
}

多线程一直是Java 面试中常考的基础知识,之前一直没有系统的学习过,这段时间对着廖雪峰大师的讲义从新把该知识内容整理一遍,该文章系列内容全部来源于廖雪峰官方网站Java 基础教程
并且Callable接口是一个泛型接口,可以返回指定类型的结果。如果仔细看ExecutorService.submit()方法,可以看到,它返回了一个Future类型,一个Future类型的实例代表一个未来能获取结果的对象:

ExecutorService executor = Executors.newFixedThreadPool(4);
// 定义任务:
Callable<String> task = new Task();
// 提交任务并获得Future:
Future<String> future = executor.submit(task);
// 从Future获取异步执行返回的结果:
String result = future.get(); // 可能阻塞

当我们提交一个Callable任务后,我们会同时获得一个Future对象,然后,我们在主线程某个时刻调用Future对象的get()方法,就可以获得异步执行的结果。在调用get()时,如果异步任务已经完成,我们就直接获得结果。如果异步任务还没有完成,那么get()会阻塞,直到任务完成后才返回结果。
一个Future<V>接口表示一个未来可能会返回的结果,它定义的方法有:

  • get():获取结果(可能会等待)

  • get(long timeout, TimeUnit unit):获取结果,但只等待指定的时间;

  • cancel(boolean mayInterruptIfRunning):取消当前任务;

  • isDone():判断任务是否已完成。

16使用CompletableFuture

使用Future获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,这两种方法都不是很好,因为主线程也会被迫等待。从Java 8开始引入了CompletableFuture,它针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。我们以获取股票价格为例,看看如何使用CompletableFuture:

public class TestCompletableFuture {public static void main(String[] args) throws Exception{//创建异步执行任务CompletableFuture<Double> completableFuture = CompletableFuture.supplyAsync(TestCompletableFuture::fetchPrice);//执行成功completableFuture.thenAccept((result) -> {System.out.println("price:" + result);});//执行异常completableFuture.exceptionally((e) ->{System.out.println("这是异常信息出口");e.printStackTrace();return null;});//主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:Thread.sleep(200);}static double fetchPrice(){try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}if(Math.random() < 0.3){System.out.println("fetch is failed");}// int i = 1/0;return 5 + Math.random()*20;}
}

创建一个CompletableFuture是通过CompletableFuture.supplyAsync()实现的,它需要一个实现了Supplier接口的对象:

public interface Supplier<T> {T get();
}

用lambda语法简化了一下,直接传入Main::fetchPrice,因为Main.fetchPrice()静态方法的签名符合Supplier接口的定义(除了方法名外)。紧接着,CompletableFuture已经被提交给默认的线程池执行了,我们需要定义的是CompletableFuture完成时和异常时需要回调的实例。完成时,CompletableFuture会调用Consumer对象:

public interface Consumer<T> {void accept(T t);
}

异常时,CompletableFuture会调用Function对象:

public interface Function<T, R> {R apply(T t);
}

这里我们都用lambda语法简化了代码。
可见CompletableFuture的优点是:

  • 异步任务结束时,会自动回调某个对象的方法;

  • 异步任务出错时,会自动回调某个对象的方法;

  • 主线程设置好回调后,不再关心异步任务的执行。

如果只是实现了异步回调机制,我们还看不出CompletableFuture相比Future的优势。CompletableFuture更强大的功能是,多个CompletableFuture可以串行执行,例如,定义两个CompletableFuture,第一个CompletableFuture根据证券名称查询证券代码,第二个CompletableFuture根据证券代码查询证券价格,这两个CompletableFuture实现串行操作如下:

/*** @Auther Mario* @Date 2020-12-02 20:24* @Version 1.0** 验证CompletableFuture 串行执行*/
public class TestCompletableFutureCuan {public static void main(String[] args) throws Exception{//第一个任务CompletableFuture<String> querycode = CompletableFuture.supplyAsync(() -> {return queryCode("中国石油");});// queryCode 成功后执行第二个任务CompletableFuture<Double> qureyPrice = querycode.thenApplyAsync((code) -> {return queryPrice(code);});//成功打印结果qureyPrice.thenAccept((result) -> {System.out.println("price:" + result);});qureyPrice.exceptionally((e) ->{System.out.println("出错了");return null;});//主线程不要立马结束,等待CompletableFuture结束Thread.sleep(200);}static String queryCode(String name){try {Thread.sleep(100);} catch (InterruptedException e) {return "queryCode err";}return "601875";}static double queryPrice(String code){try {Thread.sleep(100);} catch (InterruptedException e) {return 5555d;}
// return 5d + Math.random() * 20d;return 5 + Math.random() * 20;}
}

除了串行执行外,多个CompletableFuture还可以并行执行。例如,我们考虑这样的场景:同时从新浪和网易查询证券代码,只要任意一个返回结果,就进行下一步查询价格,查询价格也同时从新浪和网易查询,只要任意一个返回结果,就完成操作:

public class TestCompletableFutureBing {public static void main(String[] args) throws Exception{// 两个CompletableFuture执行异步查询:CompletableFuture<String> cfcodeSina = CompletableFuture.supplyAsync(() -> {return queryCode("中国石油","http://www.sina.com....");});CompletableFuture<String> cfcode163 = CompletableFuture.supplyAsync(() -> {return queryCode("中国石油","http://www.163.com....");});// 用anyOf合并为一个新的CompletableFuture:CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfcode163,cfcodeSina);// 两个CompletableFuture执行异步查询:CompletableFuture<Double> cfpriceSina = cfQuery.thenApplyAsync((code) -> {return queryPrice((String) code,"http://www.sina.com....");});CompletableFuture<Double> cfPrice163 = cfQuery.thenApplyAsync((code) -> {return queryPrice((String) code,"http://www.163.com....");});// 用anyOf合并为一个新的CompletableFuture:CompletableFuture<Object> cfPrice = CompletableFuture.anyOf(cfpriceSina,cfPrice163);// 最终结果:cfPrice.thenAccept((price) -> {System.out.println("price: " + price);});Thread.sleep(200);}static String queryCode(String name,String url){System.out.println("query Code from " + url + "....");try {Thread.sleep((long) (Math.random() * 100));} catch (InterruptedException e) {// e.printStackTrace();}return "601857";}static Double queryPrice(String code,String url){System.out.println("query price from " + url + "....");try {Thread.sleep((long) (Math.random() * 100));} catch (InterruptedException e) {// e.printStackTrace();}return 5 + Math.random() * 20;}}

除了anyOf()可以实现“任意个CompletableFuture只要一个成功”,allOf()可以实现“所有CompletableFuture都必须成功”,这些组合操作可以实现非常复杂的异步流程控制。最后我们注意CompletableFuture的命名规则:

  • xxx():表示该方法将继续在已有的线程中执行;

  • xxxAsync():表示将异步在线程池中执行。

小结:CompletableFuture可以指定异步处理流程:

  • thenAccept()处理正常结果;

  • exceptional()处理异常结果;

  • thenApplyAsync()用于串行化另一个CompletableFuture;

  • anyOf()和allOf()用于并行化多个CompletableFuture。

17使用ForkJoin

Java 7开始引入了一种新的Fork/Join线程池,它可以执行一种特殊的任务:把一个大任务拆成多个小任务并行执行。
这就是Fork/Join任务的原理:判断一个任务是否足够小,如果是,直接计算,否则,就分拆成几个小任务分别计算。这个过程可以反复“裂变”成一系列小任务。我们来看如何使用Fork/Join对大数据进行并行求和:

/*** @Auther Mario* @Date 2020-12-03 14:19* @Version 1.0* ForkJoin 测试用例*/
public class TestForkJoin {public static void main(String[] args) throws Exception{long sum = 0l;long[] array = new long[2000];for(int i = 0;i< array.length;i++){array[i] = random();sum += array[i];}System.out.println("Expected sum: " + sum);ForkJoinTask<Long> task = new SumTask(array,0,array.length);long startTime = System.currentTimeMillis();Long result = ForkJoinPool.commonPool().invoke(task);long endTime = System.currentTimeMillis();System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms.");
// System.out.println("总共执行时间是:" + (endTime - startTime));}static Random random = new Random(0);static long random(){return random.nextInt(10000);}}class SumTask extends RecursiveTask<Long>{static final int THRESHOLD = 500;private long[] array;private int start;private int end;public SumTask(long[] array, int start, int end) {this.array = array;this.start = start;this.end = end;}@Overrideprotected Long compute(){if(end - start <= THRESHOLD){long result = 0;for(int i= start;i< end ;i++){result += array[i];try {Thread.sleep(1);} catch (InterruptedException e) {// e.printStackTrace();}}return result;}int midle = (start + end)/2;System.out.println(String.format("split %d~%d ==> %d~%d %d~%d", start,end,start,midle,midle,end ));SumTask s1 = new SumTask(this.array,start,midle);SumTask s2 = new SumTask(this.array,midle,end);//involAllinvokeAll(s1,s2);Long r1 = s1.join();Long r2 = s2.join();Long result = r1 + r2;System.out.println("result = " + r1 + " + " + r2 + " ==> " + result);return result;}}

观察上述代码的执行过程,一个大的计算任务0~2000首先分裂为两个小任务0~1000和1000~2000,这两个小任务仍然太大,继续分裂为更小的0~500,500~1000,1000~1500,1500~2000,最后,计算结果被依次合并,得到最终结果。
因此,核心代码SumTask继承自RecursiveTask,在compute()方法中,关键是如何“分裂”出子任务并且提交子任务:

class SumTask extends RecursiveTask<Long> {protected Long compute() {// “分裂”子任务:SumTask subtask1 = new SumTask(...);SumTask subtask2 = new SumTask(...);// invokeAll会并行运行两个子任务:invokeAll(subtask1, subtask2);// 获得子任务的结果:Long subresult1 = subtask1.join();Long subresult2 = subtask2.join();// 汇总结果:return subresult1 + subresult2;}
}

Fork/Join线程池在Java标准库中就有应用。Java标准库提供的java.util.Arrays.parallelSort(array)可以进行并行排序,它的原理就是内部通过Fork/Join对大数组分拆进行并行排序,在多核CPU上就可以大大提高排序的速度。
小结:

  • Fork/Join是一种基于“分治”的算法:通过分解任务,并行执行,最后合并结果得到最终结果。

  • ForkJoinPool线程池可以把一个大任务分拆成小任务并行执行,任务类必须继承自RecursiveTask或RecursiveAction。

  • 使用Fork/Join模式可以进行并行计算以提高效率。

18使用ThreadLocal

多线程是Java实现多任务的基础,Thread对象代表一个线程,我们可以在代码中调用Thread.currentThread()获取当前线程。例如,打印日志时,可以同时打印出当前线程的名字:

public class TestThreadLocal {ThreadLocal<User> threadLocalUser = new ThreadLocal<>();User user = new User();public static void main(String[] args) {log("start");new Thread(() -> {log("print...");}).start();new Thread(() -> {log("test");}).start();log("end");}static void log(String s){System.out.println(Thread.currentThread().getName() + s);}}

对于多任务,Java标准库提供的线程池可以方便地执行这些任务,同时复用线程。Web应用程序就是典型的多任务应用,每个用户请求页面时,我们都会创建一个任务,类似:

public void process(User user) {checkPermission();doWork();saveStatus();sendResponse();
}

然后,通过线程池去执行这些任务。多线程一直是Java 面试中常考的基础知识,之前一直没有系统的学习过,这段时间对着廖雪峰大师的讲义从新把该知识内容整理一遍,该文章系列内容全部来源于廖雪峰官方网站Java 基础教程观察process()方法,它内部需要调用若干其他方法,同时,我们遇到一个问题:如何在一个线程内传递状态?process()方法需要传递的状态就是User实例。有的童鞋会想,简单地传入User就可以了:

public void process(User user) {checkPermission(user);doWork(user);saveStatus(user);sendResponse(user);
}

但是往往一个方法又会调用其他很多方法,这样会导致User传递到所有地方:

void doWork(User user) {queryStatus(user);checkStatus();setNewStatus(user);log();
}

这种在一个线程中,横跨若干方法调用,需要传递的对象,我们通常称之为上下文(Context),它是一种状态,可以是用户身份、任务信息等。给每个方法增加一个context参数非常麻烦,而且有些时候,如果调用链有无法修改源码的第三方库,User对象就传不进去了。Java标准库提供了一个特殊的ThreadLocal,它可以在一个线程中传递同一个对象。ThreadLocal实例通常总是以静态字段初始化如下:

static ThreadLocal<User> threadLocalUser = new ThreadLocal<>();

它的典型使用方式如下:

void processUser(user) {try {threadLocalUser.set(user);step1();step2();} finally {threadLocalUser.remove();}
}

通过设置一个User实例关联到ThreadLocal中,在移除之前,所有方法都可以随时获取到该User实例:

void step1() {User u = threadLocalUser.get();log();printUser();
}void log() {User u = threadLocalUser.get();println(u.name);
}void step2() {User u = threadLocalUser.get();checkUser(u.id);
}

注意到普通的方法调用一定是同一个线程执行的,所以,step1()、step2()以及log()方法内,threadLocalUser.get()获取的User对象是同一个实例。实际上,可以把ThreadLocal看成一个全局Map<Thread, Object>:每个线程获取ThreadLocal变量时,总是使用Thread自身作为key:

Object threadLocalValue = threadLocalMap.get(Thread.currentThread());

因此,ThreadLocal相当于给每个线程都开辟了一个独立的存储空间,各个线程的ThreadLocal关联的实例互不干扰。
最后,特别注意ThreadLocal一定要在finally中清除:

try {threadLocalUser.set(user);...
} finally {threadLocalUser.remove();
}

这是因为当前线程执行完相关代码后,很可能会被重新放入线程池中,如果ThreadLocal没有被清除,该线程执行其他代码时,会把上一次的状态带进去。为了保证能释放ThreadLocal关联的实例,我们可以通过AutoCloseable接口配合try (resource) {…}结构,让编译器自动为我们关闭。例如,一个保存了当前用户名的ThreadLocal可以封装为一个UserContext对象:

public class UserContext implements AutoCloseable {static final ThreadLocal<String> ctx = new ThreadLocal<>();public UserContext(String user) {ctx.set(user);}public static String currentUser() {return ctx.get();}@Overridepublic void close() {ctx.remove();}
}

使用的时候,我们借助try (resource) {…}结构,可以这么写:

try (var ctx = new UserContext("Bob")) {// 可任意调用UserContext.currentUser():String currentUser = UserContext.currentUser();
} // 在此自动调用UserContext.close()方法释放ThreadLocal关联对象

这样就在UserContext中完全封装了ThreadLocal,外部代码在try (resource) {…}内部可以随时调用UserContext.currentUser()获取当前线程绑定的用户名。

小结:

  • ThreadLocal表示线程的“局部变量”,它确保每个线程的ThreadLocal变量都是各自独立的;

  • ThreadLocal适合在一个线程的处理流程中保持上下文(避免了同一参数在所有方法中传递);

  • 使用ThreadLocal要用try … finally结构,并在finally中清除。

牧马人,公众号:编程牧马人Java 多线程基础(下)

文案@牧马人 编辑@牧马人

Java多线程基础(下)相关推荐

  1. Java多线程干货系列(1):Java多线程基础

    转载自  Java多线程干货系列(1):Java多线程基础 前言 多线程并发编程是Java编程中重要的一块内容,也是面试重点覆盖区域,所以学好多线程并发编程对我们来说极其重要,下面跟我一起开启本次的学 ...

  2. JAVA多线程基础篇-关键字synchronized

    1.概述 syncronized是JAVA多线程开发中一个重要的知识点,涉及到多线程开发,多多少少都使用过.那么syncronized底层是如何实现的?为什么加了它就能实现资源串行访问?本文将基于上述 ...

  3. 爬梯:Java多线程基础

    学习资源:狂神说 Java多线程基础 1.多线程概述 Process 进程 一个进程可以有多个线程. Thread 线程 线程就是独立的执行路径 在程序运行时,即使没有自己创建线程,后台也会有多个线程 ...

  4. java多线程基础学习[狂神说java-多线程笔记]

    java多线程基础学习 一.线程简介 1.类比 2.程序进程线程 3.线程的核心概念 二.线程的实现(重点) 调用方法与调用多线程的区别 Thread 类 1.thread使用方法 2. 代码实现 3 ...

  5. Java多线程干货系列—(一)Java多线程基础

    前言 多线程并发编程是Java编程中重要的一块内容,也是面试重点覆盖区域,所以学好多线程并发编程对我们来说极其重要,下面跟我一起开启本次的学习之旅吧. 正文 线程与进程 1 线程:进程中负责程序执行的 ...

  6. 一篇文章弄懂Java多线程基础和Java内存模型

    文章目录 一.多线程的生命周期及五种基本状态 二.Java多线程的创建及启动 1.继承Thread类,重写该类的run()方法 2.通过实现Runnable接口创建线程类 3.通过Callable和F ...

  7. java多线程基础视频_【No996】2020年最新 Java多线程编程核心基础视频课程

    01.课程介绍.mp4 02.多线程编程基础-进程与线程.mp4 03.多线程编程基础-使用多线程-继承Thread类.mp4 04.多线程编程基础-使用多线程-实现Runnable接口.mp4 05 ...

  8. 最新最全的java多线程基础总结(上)

    感觉对你有用,右边点个赞哈! 知识点 应该了解的概念 1. 线程与进程 进程是指一个内存中运行的应用程序,每个进程都有自己独立的一块内存空间,一个进程中可以启动多个线程.比如在 Windows 系统中 ...

  9. 【Java进阶营】JAVA多线程基础学习二:synchronized

    本篇主要介绍Java多线程中的同步,也就是如何在Java语言中写出线程安全的程序,如何在Java语言中解决非线程安全的相关问题,没错就是使用synchronized. 一.如何解决线程安全问题? 一般 ...

  10. Java多线程基础篇(02)-多线程的实现

    为什么80%的码农都做不了架构师?>>>    1.概要 JAVA多线程实现方式主要有三种:继承Thread类.实现Runnable接口.使用ExecutorService.Call ...

最新文章

  1. springboot打成jar后获取classpath下的文件
  2. 程序员因中年危机从北京回老家事业单位:工资从60万爆降到6万
  3. java好还是python好-学python好还是java好
  4. 大型网站的负载均衡器、db proxy和db
  5. Hibernate中二级缓存配置
  6. HTML页面背景音乐控制
  7. ORACLE DATAGURARD配置手记
  8. python队列只能一个个读取吗_python队列Queue的详解
  9. 【C语言重点难点精讲】关键字精讲
  10. 三阶段dea模型 matlab源程序,三阶段DEA模型SFA二阶段剔除过程
  11. 使用.NET进行高效率互联网敏捷开发的思考和探索【一、概述】
  12. pcb文件转成原理图_初学PCB设计,到底该学习哪款软件?
  13. 怎么恢复佳能相机SD卡CF卡误删除格式化的MOV视频
  14. win10系统魔兽世界无法连接服务器地址,win10系统魔兽世界无法启动3d加速怎么解决...
  15. 【Kepware与S71500连接 返回协议错误0X81解决方案】
  16. 《黑客与画家》摘要读后感
  17. java 获取流 丢失_java文件流数据丢失问题
  18. 一、PS是PhotoShop的缩写
  19. 基于视频的电熔镁炉工况识别系统→6.电熔镁炉服务器设计
  20. 39-程序中的三国天下

热门文章

  1. 计算机特殊社会环境,计算机应用职业生涯规划书
  2. 丽博版魔都家居图鉴:如何住进《三十而已》的精致家
  3. JAVA实现百度网盘文件上传
  4. C语言练习之打印9*9乘法口诀表
  5. esp32(ROS2foxy)之飞龙在天turtlesim最快能多快???
  6. 快来喝杯Java(初级第一章)
  7. 【plotly+ datashader+mapbox】Uber纽约上车点可视化/解决超大量地理数据可视化
  8. 学习计算机it编程的 10 大好处
  9. xlsx文件怎么打开?3种方法:Excel+WPS+兼容包来搞定
  10. Linux:设置文件夹权限之777的含义