并发编程入门(五):Java并发包和Java8并发
目录
前言
JUC(Java.util.concurrent)
1.Java并发包之原子类
1.1.AtomicInteger
1.2.AtomicReference
1.3.AtomicStampedReference
1.4.AtomicArray
2.Java并发包之工具类
2.1.CountDownLatch
2.2.CycilerBarrier
2.3.ExChanger
3.Java并发包之并发容器
3.1.BlockQueue
3.2.阻塞式写方法
3.3.非阻塞式写方法
3.4.阻塞式读方法
3.5.非阻塞式读方法
3.6.实现生产者-消费者
4.小结
Java8与并发
1.并行流与并行排序
1.1.使用并行流过滤数据
1.2.从集合中获取并行流
2.增强的Future:CompletableFuture
2.1.完成了就通知我
2.2.异步执行任务
2.3.流式调用
2.4.处理异常
3.小结
多线程究竟有什么用?
前言
本文是《Java并发视频入门》视频课程的笔记总结,旨在帮助更多同学入门并发编程。
本系列共五篇博客,本篇博客着重聊并发编程工具类JUC、Java8与并发。
侵权删。原视频地址:
【多线程】Java并发编程入门_哔哩哔哩_bilibili本课程是Java并发编程入门版课程,适合基础薄弱的同学进行学习。如需完整版请移步腾讯课堂进行购买:https://ke.qq.com/course/3486171?tuin=5e1f405a更多优质课程请上腾讯课堂搜索“雷俊华老师”:https://ke.qq.com/course/3486171?tuin=5e1f405ahttps://www.bilibili.com/video/BV1a5411u7o7?p=1
博客汇总地址:
《并发编程入门》总结篇_勤修的博客-CSDN博客本文是《Java并发视频入门》视频课程的笔记总结,旨在帮助更多同学入门并发编程。本系列共五篇博客,此文为五篇博客的汇总篇。https://kevinhe.blog.csdn.net/article/details/125143179
JUC(Java.util.concurrent)
并发编程使用的工具类,除了线程池、显式锁,还包括原子类、工具类和容器类。
1.Java并发包之原子类
volatile修饰基本/引用类型后,具备了可见性和有序性,但无法保证原子性。原子性可通过Sychronized关键字来保证。执行简单操作加锁过于大材小用。
比如执行i++,可以通过原子类工具集。比如AtomicInteger,这些类是线程安全的。
1.1.AtomicInteger
AtomicInteger是线程安全的,除此之外,还有AtomicBoolean、AtomicLong。
线程不安全代码如下:
public class Test1 {private static int value = 0;public static void main(String[] args) {for (int i = 0; i < 10; i++) {new Thread(new Runnable() {@Overridepublic void run() {for (int j = 0; j < 10; j++) {System.out.println(value++);try {TimeUnit.MILLISECONDS.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}}}).start();}}
}
结果输出:
0
...
10
12
11
11
15
15
...
优化后的线程安全代码:
public class Test1 {private static AtomicInteger value = new AtomicInteger(0);public static void main(String[] args) {for (int i = 0; i < 10; i++) {new Thread(new Runnable() {@Overridepublic void run() {for (int j = 0; j < 10; j++) {System.out.println(value.addAndGet(2));
// System.out.println(value.getAndIncrement());try {TimeUnit.MILLISECONDS.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}}}).start();}}
}
结果输出:
2
6
4
8
10
12
...
1.2.AtomicReference
针对对象引用的非阻塞原子性操作。
应用场景:个人银行账号。几点要求:1.个人账号被设计为不可变对象;2.只包含账号名、现金;3.便于验证,资金只增不减。
假设此时有十个人往这个账号打钱,每个人每次打10元,多线程来实现。
线程不安全代码:
public class Test2 {private static volatile BankAccount bankAccount = new BankAccount("kevin", 0);public static void main(String[] args) {for (int i = 0; i < 10; i++) {new Thread(()-> {// 读取引用final BankAccount account = bankAccount;// 创建新对象BankAccount newAccount = new BankAccount(account.getAccount(), account.getAmount() + 10);System.out.println(newAccount);bankAccount = newAccount;try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}}).start();}}}class BankAccount {private final String account;private final int amount;public BankAccount(String account, int amount) {this.account = account;this.amount = amount;}public int getAmount() {return amount;}public String getAccount() {return account;}@Overridepublic String toString() {return "BankCount{" +"account='" + account + '\'' +", amount=" + amount +'}';}
}
结果输出:
BankCount{account='kevin', amount=10}
BankCount{account='kevin', amount=10}
BankCount{account='kevin', amount=10}
BankCount{account='kevin', amount=10}
BankCount{account='kevin', amount=10}
BankCount{account='kevin', amount=10}
BankCount{account='kevin', amount=10}
BankCount{account='kevin', amount=10}
BankCount{account='kevin', amount=10}
BankCount{account='kevin', amount=10}
线程安全代码,但仍有问题:
public class Test2 {private static AtomicReference<BankAccount> reference = new AtomicReference<>(new BankAccount("kevin", 0));public static void main(String[] args) {for (int i = 0; i < 10; i++) {new Thread(() -> {// 读取引用final BankAccount account = reference.get();// 创建新对象BankAccount newAccount = new BankAccount(account.getAccount(), account.getAmount() + 10);// CAS更换if (reference.compareAndSet(account, newAccount)) {System.out.println(newAccount);}try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}}).start();}}
}class BankAccount {private final String account;private final int amount;public BankAccount(String account, int amount) {this.account = account;this.amount = amount;}public int getAmount() {return amount;}public String getAccount() {return account;}@Overridepublic String toString() {return "BankCount{" +"account='" + account + '\'' +", amount=" + amount +'}';}
}
结果输出:
BankCount{account='kevin', amount=40}
BankCount{account='kevin', amount=60}
BankCount{account='kevin', amount=10}
BankCount{account='kevin', amount=20}
BankCount{account='kevin', amount=30}
BankCount{account='kevin', amount=50}
BankCount{account='kevin', amount=80}
BankCount{account='kevin', amount=70}
1.3.AtomicStampedReference
ABA问题:一开始是A、后来变成B,再后来又变成A。那么CAS可能没法进行辨别,如何避免?加入版本号概念,使用AtomicStampedReference。
public class Test3 {private static AtomicStampedReference<String> reference = new AtomicStampedReference<>("Hello", 0);public static void main(String[] args) {reference.compareAndSet("Hello", "world", 0, 1);String r = Test3.reference.getReference();System.out.println(r);}
}
结果输出:
world
1.4.AtomicArray
原子性操作数组,比如AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray。
public class Test4 {public static void main(String[] args) {int []array = {1,2,3,4,5,6,7,8,9};AtomicIntegerArray integerArray = new AtomicIntegerArray(array);int andIncrement = integerArray.getAndIncrement(0);System.out.println(andIncrement);System.out.println(integerArray);}
}
结果输出:
1
[2, 2, 3, 4, 5, 6, 7, 8, 9]
2.Java并发包之工具类
2.1.CountDownLatch
子任务均结束后,当前主任务才会进入下一个阶段。
类似于倒计时阀门,有一个门阀等待倒计数,计数器为0时才打开。
eg:计算商品价格。
public class Test5 {public static void main(String[] args) throws InterruptedException {int[] productIds = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};List<ProducePrice> priceList = Arrays.stream(productIds).mapToObj(ProducePrice::new).collect(Collectors.toList());//创建CountDownLatch,容量为子任务数量CountDownLatch latch = new CountDownLatch(priceList.size());//创建子任务priceList.forEach( p -> {new Thread(new Runnable() {@Overridepublic void run() {try {System.out.println(p.getProductId() + "开始计算价格");TimeUnit.SECONDS.sleep(new Random().nextInt(10));p.setPrice(p.getProductId() * 0.9D);} catch (InterruptedException e) {e.printStackTrace();} finally {//计数器减一,子任务减一。latch.countDown();}}}).start();});//await开始等待,主任务阻塞,直到latch.countDown()=0latch.await();System.out.println("所有价格计算完毕");priceList.forEach(System.out::println);}private static class ProducePrice {private int productId;private double price;public ProducePrice(int productId) {this.productId = productId;}public ProducePrice(int productId, double price) {this.price = price;this.productId = productId;}public void setPrice(double price) {this.price = price;}public double getPrice() {return price;}public int getProductId() {return productId;}@Overridepublic String toString() {return "ProducePrice{" +"productId=" + productId +", price=" + price +'}';}}
}
结果输出:
4开始计算价格
9开始计算价格
10开始计算价格
3开始计算价格
1开始计算价格
5开始计算价格
7开始计算价格
6开始计算价格
8开始计算价格
2开始计算价格
所有价格计算完毕
ProducePrice{productId=1, price=0.9}
ProducePrice{productId=2, price=1.8}
ProducePrice{productId=3, price=2.7}
ProducePrice{productId=4, price=3.6}
ProducePrice{productId=5, price=4.5}
ProducePrice{productId=6, price=5.4}
ProducePrice{productId=7, price=6.3}
ProducePrice{productId=8, price=7.2}
ProducePrice{productId=9, price=8.1}
ProducePrice{productId=10, price=9.0}
2.2.CycilerBarrier
循环屏障,允许多个线程在执行完相应的操作后彼此等待共同到达一个障点。与CountDownLatch类似,有一个不同点:CountDownLatch不可重复利用,CycilerBarrier可重复利用。
快速上手
与CountDownLatch的“快速上手”类似。
public class Test6 {public static void main(String[] args) {int[] productIds = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};List<ProducePrice> priceList = Arrays.stream(productIds).mapToObj(ProducePrice::new).collect(Collectors.toList());//参数为分片,而非计数器。CyclicBarrier barrier = new CyclicBarrier(priceList.size());List<Thread> threadList = new ArrayList<>();priceList.forEach(p -> {Thread thread = new Thread(() -> {try {System.out.println(p.getProductId() + "开始计算价格");TimeUnit.SECONDS.sleep(new Random().nextInt(10));p.setPrice(p.getProductId() * 0.9D);} catch (InterruptedException e) {e.printStackTrace();} finally {//等待其他子任务也执行到这个障点try {barrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}});threadList.add(thread);thread.start();});threadList.forEach( t -> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});System.out.println("所有价格计算完毕");priceList.forEach(System.out::println);}private static class ProducePrice {private int productId;private double price;public ProducePrice(int productId) {this.productId = productId;}public ProducePrice(int productId, double price) {this.price = price;this.productId = productId;}public void setPrice(double price) {this.price = price;}public double getPrice() {return price;}public int getProductId() {return productId;}@Overridepublic String toString() {return "ProducePrice{" +"productId=" + productId +", price=" + price +'}';}}
}
结果输出:
7开始计算价格
2开始计算价格
9开始计算价格
5开始计算价格
10开始计算价格
1开始计算价格
4开始计算价格
3开始计算价格
8开始计算价格
6开始计算价格
所有价格计算完毕
ProducePrice{productId=1, price=0.9}
ProducePrice{productId=2, price=1.8}
ProducePrice{productId=3, price=2.7}
ProducePrice{productId=4, price=3.6}
ProducePrice{productId=5, price=4.5}
ProducePrice{productId=6, price=5.4}
ProducePrice{productId=7, price=6.3}
ProducePrice{productId=8, price=7.2}
ProducePrice{productId=9, price=8.1}
ProducePrice{productId=10, price=9.0}
循环使用
无需显式重置,内部到零后重置。需要确保能被整除完。
旅游团上车之前(障点)清点人数,到达站点之后(障点)再次清点人数,写个程序模拟下。
public class Test7 {public static void main(String[] args) throws BrokenBarrierException, InterruptedException {CyclicBarrier cyclicBarrier = new CyclicBarrier(4);for (int i = 0; i < 3; i++) {new Thread(new Tourist(i, cyclicBarrier)).start();}cyclicBarrier.await();System.out.println("所有人已经上车");cyclicBarrier.await();System.out.println("所有人已经下车");}private static class Tourist implements Runnable {private final int id;private final CyclicBarrier barrier;public Tourist(int id, CyclicBarrier barrier) {this.id = id;this.barrier = barrier;}@Overridepublic void run() {System.out.println("游客" + id + "上车");try {TimeUnit.MILLISECONDS.sleep(300);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("游客" + id + "已经上车,等待其他人上车");try {barrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}System.out.println("游客" + id + "等待上车结束");try {TimeUnit.SECONDS.sleep(new Random().nextInt(10));} catch (InterruptedException e) {e.printStackTrace();}System.out.println("游客" + id + "已经下车,等待其他人下车");try {barrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}System.out.println("游客" + id + "等待下车结束");}}
}
结果输出:
游客1上车
游客2上车
游客0上车
游客2已经上车,等待其他人上车
游客0已经上车,等待其他人上车
游客1已经上车,等待其他人上车
所有人已经上车
游客1等待上车结束
游客0等待上车结束
游客2等待上车结束
游客1已经下车,等待其他人下车
游客0已经下车,等待其他人下车
游客2已经下车,等待其他人下车
所有人已经下车
游客2等待下车结束
游客0等待下车结束
游客1等待下车结束
与CountDownLatch的区别:
CountDownLatch:一个线程,等待另外N个线程执行完后才能执行;(一个线程等待多个线程)
CycilerBarrier:N个线程相互等待,任何一个线程完成之前,所有线程必须等待。(多个线程互相等待)
Phaser:
可重复使用的同步屏障,功能非常类似于CycilerBarrier和CountDownLatch的合集。解决了CountDownLatch手动重置的问题;解决了CycilerBarrier一旦制定Size无法改变的问题。一般不建议使用。
2.3.ExChanger
ExChanger简化了两个线程间的数据交互,提供了两个线程的数据交换点。交换两个线程提供给对方的数据。
快速上手:可以看做生产者-消费者模式的实现,关注重点是数据交换。等待3s后交换。
public class Test9 {public static void main(String[] args) {Exchanger<String> exchanger = new Exchanger<>();new Thread(new Runnable() {@Overridepublic void run() {System.out.println("T1启动了");try {TimeUnit.SECONDS.sleep(1);String data = exchanger.exchange("我是T1数据");System.out.println("从T2获取的数据" + data);} catch (InterruptedException e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {System.out.println("T2启动了");try {TimeUnit.SECONDS.sleep(3);String data = exchanger.exchange("我是T2数据");System.out.println("从T1获取的数据" + data);} catch (InterruptedException e) {e.printStackTrace();}}}).start();}
}
结果输出:
T1启动了
T2启动了
从T1获取的数据:我是T1数据
从T2获取的数据:我是T2数据
3.Java并发包之并发容器
3.1.BlockQueue
元素数量存在界限,队列满时,写操作线程将会被阻塞挂起,队列空时,读操作线程将会被阻塞挂起。类似于生产者-消费者。内部实现主要依赖于显式Lock和Condition。
ArrayBlockQueue是基于数组实现的FIFO阻塞队列,分为两种不同写法:阻塞式和非阻塞式。
3.2.阻塞式写方法
Put:阻塞写操作;Offer:阻塞写,但有超时时间。
public class Test12 {public static void main(String[] args) throws InterruptedException {ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2);//写操作,若超时1s,即超过指定时间,则写入失败boolean a = queue.offer("a", 1, TimeUnit.SECONDS);System.out.println(a);boolean b = queue.offer("b", 1, TimeUnit.SECONDS);System.out.println(b);boolean c = queue.offer("c", 1, TimeUnit.SECONDS);System.out.println(c);}public static void test1(String[] args) throws InterruptedException {ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2);//写操作, 尾部写入。queue.put("a");System.out.println("a添加成功");queue.put("b");System.out.println("b添加成功");new Thread(new Runnable() {@Overridepublic void run() {try {TimeUnit.SECONDS.sleep(3);//读操作queue.take();} catch (InterruptedException e) {e.printStackTrace();}}}).start();queue.put("c");System.out.println("c添加成功");}
}
结果输出:
case1:put操作
a添加成功
b添加成功case1:put操作后take,类似于test1
a添加成功
b添加成功
c添加成功case2:offer(XX)操作
true
true
false
3.3.非阻塞式写方法
offer/add:非阻塞式写
public class Test12 {public static void main(String[] args) throws InterruptedException {ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2);//写操作boolean a = queue.offer("a");System.out.println(a);boolean b = queue.offer("b");System.out.println(b);boolean c = queue.offer("c");System.out.println(c);}public static void test3(String[] args) throws InterruptedException {ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2);//写操作, 尾部写入。queue.add("a");System.out.println("a添加成功");queue.add("b");System.out.println("b添加成功");queue.add("c");System.out.println("c添加成功");}
}
结果输出:
case1:offer方法
true
true
falsecase1:put方法
a添加成功
b添加成功
Exception in thread "main" java.lang.IllegalStateException: Queue fullat java.base/java.util.AbstractQueue.add(AbstractQueue.java:98)at java.base/java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:329)at com.company.unit9.Test12.main(Test12.java:25)
3.4.阻塞式读方法
take:头部获取数据并移除,当队列为空时,会进入阻塞。
poll:超时时间后,退出阻塞。
public class Test13 {public static void main(String[] args) throws InterruptedException {ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2);//写操作, 尾部写入。queue.add("a");queue.add("b");String a = queue.poll(1, TimeUnit.SECONDS);System.out.println(a);String b = queue.poll(1, TimeUnit.SECONDS);System.out.println(b);String c = queue.poll(1, TimeUnit.SECONDS);System.out.println(c);}public static void test1(String[] args) throws InterruptedException {ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2);//写操作, 尾部写入。queue.add("a");queue.add("b");String a = queue.take();System.out.println(a);String b = queue.take();System.out.println(b);new Thread(()-> {try {TimeUnit.SECONDS.sleep(1);queue.offer("c");} catch (InterruptedException e) {e.printStackTrace();}}).start();String c = queue.take();System.out.println(c);}
}
结果输出:
case1:poll(X,X)方法
a
b
nullcase2:take方法,test1
a
b
c
3.5.非阻塞式读方法
poll:非阻塞式读方法,移除数据;
peek:非阻塞式读方法,不会移除数据。
public class Test13 {public static void main(String[] args) throws InterruptedException {ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2);//写操作, 尾部写入。queue.add("a");queue.add("b");String a = queue.peek();System.out.println(a);String b = queue.peek();System.out.println(b);String c = queue.peek();System.out.println(c);}public static void test3(String[] args) throws InterruptedException {ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2);//写操作, 尾部写入。queue.add("a");queue.add("b");String a = queue.poll();System.out.println(a);String b = queue.poll();System.out.println(b);String c = queue.poll();System.out.println(c);}}
结果输出:
case1:peek方法
a
a
acase2:poll方法
a
b
null
3.6.实现生产者-消费者
public class Test14 {public static void main(String[] args) {ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(10);for (int i = 0; i < 10; i++) {new Thread(() -> {while (true) {String s = System.currentTimeMillis() + "";queue.offer(s);System.out.println(Thread.currentThread() + "生产数据" + s);try {TimeUnit.SECONDS.sleep(new Random().nextInt(5));} catch (InterruptedException e) {e.printStackTrace();}}}).start();}for (int i = 0; i < 10; i++) {new Thread(() -> {while (true) {try {String s = queue.take();System.out.println(Thread.currentThread() + "读取数据" + s);TimeUnit.SECONDS.sleep(new Random().nextInt(5));} catch (InterruptedException e) {e.printStackTrace();}}}).start();}}
}
结果输出:
Thread[Thread-7,5,main]生产数据1654479198979
Thread[Thread-3,5,main]生产数据1654479198977
Thread[Thread-15,5,main]读取数据1654479198981
Thread[Thread-12,5,main]读取数据1654479198977
Thread[Thread-11,5,main]读取数据1654479198977
Thread[Thread-17,5,main]读取数据1654479198978
Thread[Thread-0,5,main]生产数据1654479198977
Thread[Thread-15,5,main]读取数据1654479199013
Thread[Thread-6,5,main]生产数据1654479198978
4.小结
本节我们介绍了Java并发包。共分为:原子类(AtomicInteger、AtomicReference、AtomicStampedReference和AtomicArray)、工具类(CountDownLatch、CycilerBarrier和ExChanger)和并发容器(基于ArrayBlockingQueue的阻塞/非阻塞写、阻塞/非阻塞读、生产者-消费者模型)。
Java8与并发
1.并行流与并行排序
1.1.使用并行流过滤数据
public class Test1 {public static void main(String[] args) {//流式编程,串行计算long time1 = System.currentTimeMillis();long count = IntStream.range(1, 10000000).filter(PrimUtils::isPrime).count();long time2 = System.currentTimeMillis();System.out.println(count);//流式编程,并行流,性能会有提升。long parallelCount = IntStream.range(1, 10000000).parallel().filter(PrimUtils::isPrime).count();long time3 = System.currentTimeMillis();System.out.println(parallelCount);System.out.println(time2 - time1);System.out.println(time3- time2);}private static class PrimUtils {public static boolean isPrime(int number) {int tmp = number;if (tmp < 2) {return false;}for (int i = 2; Math.sqrt(tmp) >= i; i++) {if (tmp % i == 0) {return false;}}return true;}}
}
结果输出:
664579
664579
5476
869
1.2.从集合中获取并行流
public class Test2 {public static void main(String[] args) {List<Student> students = new ArrayList<>();for (int i = 0; i < 1000000; i++) {students.add(new Student(new Random().nextInt(100)));}//串行流,lambda + 流式编程double score = students.stream().mapToInt(s -> s.score).average().getAsDouble();System.out.println(score);//并行流:parallelStreamdouble score2 = students.parallelStream().mapToInt(s -> s.score).average().getAsDouble();System.out.println(score2);}private static class Student {private int score;private Student(int score) {this.score = score;}}
}
结果输出:
49.546835
49.546835
还有并行排序,从Arrays.sort改为Arrays.parallelSort()。
2.增强的Future:CompletableFuture
2.1.完成了就通知我
public class Test5 {public static void main(String[] args) throws InterruptedException {CompletableFuture<Integer> completableFuture = new CompletableFuture<>();new Thread(new AskThread(completableFuture)).start();TimeUnit.SECONDS.sleep(1);completableFuture.complete(60);}public static class AskThread implements Runnable {CompletableFuture<Integer> re = null;public AskThread(CompletableFuture<Integer> re) {this.re = re;}@Overridepublic void run() {int myRe = 0;try {//get作用:阻塞到complete执行完毕,myRe = re.get() * re.get();} catch (Exception e) {e.printStackTrace();}System.out.println(myRe);}}
}
结果输出:
3600
2.2.异步执行任务
public class Test51 {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {//异步执行,新建线程return calc(50);});//调用get会阻塞System.out.println(future.get());}private static Integer calc(Integer param) {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return param * param;}
}
结果输出:
2500
2.3.流式调用
public class Test6 {public static void main(String[] args) throws ExecutionException, InterruptedException {//链式调用CompletableFuture<Void> future = CompletableFuture.supplyAsync(() ->//异步执行,新建线程calc(50)).thenApply(e -> {System.out.println("在supply Async中处理的值是:" + e);return Integer.toString(e);}).thenApply(e -> {System.out.println("在thenApply中处理的值是:" + e);return "\""+ e + "\"";}).thenAccept(System.out::println);//调用get阻塞,直到CompletableFuture所有任务执行完毕为止。future.get();}private static Integer calc(Integer param) {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return param * param;}
}
结果输出:
在supply Async中处理的值是:2500
在thenApply中处理的值是:2500
"2500"
2.4.处理异常
public class Test6 {public static void main(String[] args) throws ExecutionException, InterruptedException {//链式调用CompletableFuture<Void> future = CompletableFuture.supplyAsync(() ->//异步执行,新建线程calc(50)).exceptionally(ex -> {System.out.println("计算出现异常" + ex);System.out.println("即将返回零");return 0;}).thenApply(e -> {System.out.println("在supply Async中处理的值是:" + e);return Integer.toString(e);}).thenApply(e -> {System.out.println("在thenApply中处理的值是:" + e);return "\""+ e + "\"";}).thenAccept(System.out::println);//调用get阻塞,直到CompletableFuture所有任务执行完毕为止。future.get();}private static Integer calc(Integer param) {return param / 0;}
}
结果输出:
计算出现异常java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
即将返回零
在supply Async中处理的值是:0
在thenApply中处理的值是:0
"0"
3.小结
本节我们介绍了并行流与并行排序、增强的Future:CompletableFuture。
多线程究竟有什么用?
针对百万数据一次性导出、调用高德接口进行路径处理推荐三条路线、上千用户同时导出数据等场景,多线程、线程池(配置两个线程池,一个线程池提供给普通用户,一个线程池提供给VIP用户)会提高执行效率。
并发编程入门(五):Java并发包和Java8并发相关推荐
- 《Java并发编程入门与高并发面试》or 《Java并发编程与高并发解决方案》笔记
<Java并发编程入门与高并发面试>or <Java并发编程与高并发解决方案>笔记 参考文章: (1)<Java并发编程入门与高并发面试>or <Java并发 ...
- Java零基础并发编程入门
Java零基础并发编程入门 并发编程主要包括: 线程,同步,future,锁,fork/join, volatile,信号量,cas(原子性,可见性,顺序一致性),临界性,分布式 了解基础: JMM: ...
- java并发编程第五课 并发锁讲解一
第19讲:你知道哪几种锁?分别有什么特点? 本课时我们首先会对锁的分类有一个整体的概念,了解锁究竟有哪些分类标准.然后在后续的课程中,会对其中重要的锁进行详细讲解. 锁的 7 大分类 需要首先指出的是 ...
- java并发编程入门_Java并发编程入门,看这一篇就够了
2.3 资源限制的挑战 什么是资源限制 资源限制指在进行并发编程时,程序的执行速度受限于计算机硬件资源或软件资源. 硬件资源包括:带宽的上传下载速度.硬盘读写速度和CPU的处理速度等 软件资源包括:线 ...
- 脑残式网络编程入门(五):每天都在用的Ping命令,它到底是什么?
本文引用了公众号纯洁的微笑作者奎哥的技术文章,感谢原作者的分享. 1.前言 老于网络编程熟手来说,在测试和部署网络通信应用(比如IM聊天.实时音视频等)时,如果发现网络连接超时,第一时间想到的就是使用 ...
- 电脑编程入门自学java_电脑编程入门自学Java指南
随着Java近些年来的强劲发展,想要转行学习Java的初学者也越来越多了.然而,入门自学Java并不是一件轻松的事情.众所周知,万事开头难,尤其是没有编程语言基础的学习者,不仅仅需要付出更多的心血和汗 ...
- 并发编程(五)python实现生产者消费者模式多线程爬虫
并发编程专栏系列博客 并发编程(一)python并发编程简介 并发编程(二)怎样选择多线程多进程和多协程 并发编程(三)Python编程慢的罪魁祸首.全局解释器锁GIL 并发编程(四)如何使用多线程, ...
- 并发编程5:Java 阻塞队列源码分析(下)
上一篇 并发编程4:Java 阻塞队列源码分析(上) 我们了解了 ArrayBlockingQueue, LinkedBlockingQueue 和 PriorityBlockingQueue,这篇文 ...
- Java并发编程(五)JVM指令重排
我是不是学了一门假的java...... 引言:在Java中看似顺序的代码在JVM中,可能会出现编译器或者CPU对这些操作指令进行了重新排序:在特定情况下,指令重排将会给我们的程序带来不确定的结果.. ...
最新文章
- vue全家桶+Koa2开发笔记(5)--nuxt
- python连接spark_python如何通过pyspark的API操作spark
- Ant Design Vue 中 Tree 树形控件 defaultExpandAll 设置无效
- 线条边框简笔画图片大全_简笔画猪 手抄报图片边框版式大全
- 使用扩展存储过程xp_regread读取注册表信息
- NUMA全称 Non-Uniform Memory Access,译为“非一致性内存访问”,积极NUMA内存策略
- 千人千面之3D立体个人数据营销
- mysql 常用字段类型_mysql 常用字段类型
- c语言修改内存字节,要开学了,小白给大家一个C语言修改dnf内存的示范代码
- 1688商品类目API接口-(item_cat_get-获得1688商品类目接口)
- Z05 - 033、访客分析 - 独立访客
- 8.系统研发中的领导意志
- ios工程-如何添加mixpanel来实现统计用户的点击量、玩游戏时长
- Java实现图像增强之伽马变换
- java怎么实现华为云文字识别,OCR文字识别服务快速入门教程
- supermap mysql_超图supermap sdx数据库用sql实现空间查询
- python好看图案的编程代码_利用Python绘制了一些有意思的图案
- 详细介绍@GetMapping和@PostMapping的区别
- [乡土民间故事_徐苟三传奇]第十六回_差狗子认输吃大粪
- jsp格式date类型