使用多线程的意义:

多线程能合理充分地利用CPU资源,去实现任务的并行处理,从而提升程序性能

适合使用多线程的场景

1. IO的处理:网络IO,文件IO,用线程池异步提升响应速度

2. Tomcat8.0之前,默认的IO模型bio模式下,使用线程池异步处理accept()到的每个socket

支付场景

public class AsyncPaymentService {// 构建阻塞队列LinkedBlockingQueue<PaymentRequest> taskQueues = new LinkedBlockingQueue<PaymentRequest>(100);// 单线程池final ExecutorService singleThread = Executors.newSingleThreadExecutor();// 第三方支付服务ThirdPaymentService paymentService = new ThirdPaymentService();// 控制线程运行的属性private volatile boolean status = true;public String submitPay(PaymentRequest request){taskQueues.add(request);return "PROCESSING";}@PostConstructpublic void init(){singleThread.execute(()->{while (status){try{// 阻塞式获取队列中的,支付请求PaymentRequest request = taskQueues.take();String payResult = paymentService.doPay(request);System.out.println("支付处理结果:"+payResult);}catch (Exception e){e.printStackTrace();// 处理支付异常问题}}});}
}

注:

1. LinkedBlockingQueue初始化固定长度后,放入元素超过容量,

会抛异常:java.lang.IllegalStateException: Queue full

一、CountDownLatch

CountDownLatch的用法

CountDownLatch典型用法1:某一线程在开始运行前等待n个线程执行完毕。将CountDownLatch的计数器初始化为n new CountDownLatch(n) ,每当一个任务线程执行完毕,就将计数器减1 countdownlatch.countDown(),当计数器的值变为0时,在CountDownLatch上 await() 的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。

CountDownLatch典型用法2:实现多个线程开始执行任务的最大并行性。注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的CountDownLatch(1),将其计数器初始化为1,多个线程在开始执行任务前首先 coundownlatch.await(),当主线程调用 countDown() 时,计数器变为0,多个线程同时被唤醒。

案例一:

        ExecutorService pool = Executors.newFixedThreadPool(200);CountDownLatch waitBefore = new CountDownLatch(7000);CountDownLatch afterBefore = new CountDownLatch(7000);long now = System.currentTimeMillis();for (int i=0;i<7000;i++){pool.execute(()->{try {waitBefore.await();TimeUnit.MILLISECONDS.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}finally {afterBefore.countDown();}});waitBefore.countDown();}afterBefore.await();System.out.println("7000个任务执行完的总耗时:"+(System.currentTimeMillis() - now));

搞个线程池,线程数是 200。然后提交 7000 个任务,每个任务耗时 100ms,用 CountDownLatch 模拟了一下并发,在我的 机器上运行耗时 3.8s 的样子

上面这个例子,实际上就把CountDownLatch的两种用法都展示出来的,用法一是,做发令枪当所有7000个线程一起开始执行;用法二是,等所有7000个子线程都执行完后,主线程再开始执行;

案例二

rocketmq内,每个broker将自己注册到所有的nameserver时

public List<RegisterBrokerResult> registerBrokerAll(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final boolean oneway,final int timeoutMills,final boolean compressed) {final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();if (nameServerAddressList != null && nameServerAddressList.size() > 0) {final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();requestHeader.setBrokerAddr(brokerAddr);requestHeader.setBrokerId(brokerId);requestHeader.setBrokerName(brokerName);requestHeader.setClusterName(clusterName);requestHeader.setHaServerAddr(haServerAddr);requestHeader.setCompressed(compressed);RegisterBrokerBody requestBody = new RegisterBrokerBody();requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);requestBody.setFilterServerList(filterServerList);final byte[] body = requestBody.encode(compressed);final int bodyCrc32 = UtilAll.crc32(body);requestHeader.setBodyCrc32(bodyCrc32);final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());for (final String namesrvAddr : nameServerAddressList) {brokerOuterExecutor.execute(new Runnable() {@Overridepublic void run() {try {RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);if (result != null) {registerBrokerResultList.add(result);}log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);} catch (Exception e) {log.warn("registerBroker Exception, {}", namesrvAddr, e);} finally {// 注册完一个nameserver,计数器减一countDownLatch.countDown();}}});}// 等待broker给所有的nameserver都注册完成后,再返回try {countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {}}return registerBrokerResultList;}

二、CompletableFuture 

1. 使用CompletableFuture 将串行调用多个远程接口,改成并行调用,提高大接口性能

ThreadPoolExecutor executor = new ThreadPoolExecutor(1,1,60,TimeUnit.SECONDS,new LinkedBlockingDeque<>(1));public UserInfo getUserInfo(Long id) throws InterruptedException, ExecutionException {final UserInfo userInfo = new UserInfo();CompletableFuture userFuture = CompletableFuture.supplyAsync(() -> {getRemoteUserAndFill(id, userInfo);return Boolean.TRUE;}, executor);CompletableFuture bonusFuture = CompletableFuture.supplyAsync(() -> {getRemoteBonusAndFill(id, userInfo);return Boolean.TRUE;}, executor);// 执行到这里,回抛java.util.concurrent.RejectedExecutionException异常CompletableFuture growthFuture = CompletableFuture.supplyAsync(() -> {getRemoteGrowthAndFill(id, userInfo);return Boolean.TRUE;}, executor);CompletableFuture.allOf(userFuture, bonusFuture, growthFuture).join();userFuture.get();bonusFuture.get();growthFuture.get();return userInfo;
}

ps:

当使用自定义线程池时,如果放入的任务超过线程池能容忍的上线,任然会抛RejectedExecutionException异常;

三、线程池

1)开10个线程,同时往单表中插入

疯狂试探mysql单表insert极限:已实现每秒插入8.5w条数据_成都彭于晏-CSDN博客

5000w行数据,大约占用5G内存空间

Semaphore 信号量限流,这东西真管用吗?

1)注意信号量获取的,阻塞方法和超时方法

2)多线程阻塞时,可能会拖垮Tomcat服务的tps

ArrayList的线程不安全案例

semaphore信号量支持多线程进入临界区,所以,执行ArrayList的add和remove方法时,可能时多线程并发执行的

public class ExecutorTest {public static void main(String[] args)  {ExecutorService service = Executors.newFixedThreadPool(20);AtomicInteger count = new AtomicInteger();ObjectPool<Integer,String> pool = new ObjectPool<>(10,2);for (int i=0;i<1000000;i++){service.execute( () -> {pool.exec(t -> {System.out.println("当前线程为:"+Thread.currentThread().getName() + ",执行第:"+count.incrementAndGet()+"个任务");System.out.println(t);try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}return t.toString();});});}}}class ObjectPool<T,R>{List<T> poolList;Semaphore semaphore;public ObjectPool(int size,T t){/**用ArrayList,下面就会抛数组越界异常因为,此时ArrayList,就成为了多线程并发修改的共享资源*/pool = new ArrayList<>();
//        pool = new Vector<>();for (int i=0;i<size;i++){pool.add(t);}semaphore = new Semaphore(size);}public R exec(Function<T,R> function){T t = null;try{semaphore.acquire();t = poolList.remove(0);return function.apply(t);}catch (Exception e){System.out.println("============================== 完了,数组越界啦 ==============================");e.printStackTrace();}finally {pool.add(t);semaphore.release();}return null;}}

线程同步版块

wait/notify方法应用

题目:

一个文件中有10000个数,用Java实现一个多线程程序将这个10000个数输出到5个不用文件中(不要求输出到每个文件中的数量相同)。要求启动10个线程,两两一组,分为5组。每组两个线程分别将文件中的奇数和偶数输出到该组对应的一个文件中,需要偶数线程每打印10个偶数以后,就将奇数线程打印10个奇数,如此交替进行。同时需要记录输出进度,每完成1000个数就在控制台中打印当前完成数量,并在所有线程结束后,在控制台打印”Done”.

参考:https://blog.csdn.net/qq_42449963/article/details/103482405

public class Test {public static void main(String[] args) {PrintWriter printWriter = null;PrintWriter writer = null;BufferedReader bufferedReader = null;try {printWriter = new PrintWriter("input.txt");Random random = new Random();for (int i = 0; i < 10000; i++) {int nextInt = random.nextInt(1000);printWriter.print(nextInt + " ");}printWriter.flush();bufferedReader = new BufferedReader(new FileReader("input.txt"));String s = null;StringBuilder str = new StringBuilder();while ((s = bufferedReader.readLine()) != null) {str.append(s);}String[] split = str.toString().split(" ");int m = 0;for (int i = 0; i < 5; i++) {int[] records = new int[2000];for (int j = 0; j < 2000; j++) {records[j] = Integer.parseInt(split[m]);m++;}writer = new PrintWriter("output" + i + ".txt");MyRunnable myRunnable = new MyRunnable(records, writer);new Thread(myRunnable).start();new Thread(myRunnable).start();}} catch (FileNotFoundException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();} finally {if (printWriter != null) {printWriter.close();}if (writer != null) {printWriter.close();}if (bufferedReader != null) {try {bufferedReader.close();} catch (IOException e) {e.printStackTrace();}}}}
}

任务类


class MyRunnable implements Runnable {private static int count = 0;private static final Object lock = new Object();private static final int EVEN = 0;private static final int ODD = -1; private int type;private PrintWriter writer;private int[] records;private int evenPoint = 0;private int oddPoint = 0;public MyRunnable(int[] records, PrintWriter writer) {this.records = records;this.writer = writer;this.type = EVEN;}@Overridepublic void run() {while (print()) ;}private synchronized boolean print() {for (int i = 0; i < 10; ) {if (evenPoint >= records.length && oddPoint >= records.length) {notifyAll();return false;}if ((evenPoint >= records.length && type == EVEN) || (oddPoint >= records.length && type == ODD)) {break;}if (type == EVEN) {if (records[evenPoint] % 2 == 0) {i++;writer.print(records[evenPoint] + " ");writer.flush();synchronized (lock) {count++;if (count % 1000 == 0) {System.out.println("已经打印了" + count+"个");if (count == 10000) {System.out.println("Done!");}}}}evenPoint++;} else {if (records[oddPoint] % 2 == 1) {i++;writer.print(records[oddPoint] + " ");writer.flush();synchronized (lock) {count++;if (count % 1000 == 0) {System.out.println("已经打印了" + count+"个");if (count == 10000) {System.out.println("Done!");}}}}oddPoint++;}}type = ~type;notifyAll();try {wait();} catch (InterruptedException e) {e.printStackTrace();}return true;}
}

聊聊并发编程的12种业务场景

实际场景中的多线程使用相关推荐

  1. 如何应对面试官:什么场景中会用到java多线程?

    如何应对面试官:什么场景中会用到java多线程? 作者:云栖社区 原文:https://yq.aliyun.com/ziliao/1765 (点击文末阅读原文即可前往) 问:能不能简单描述一下你在ja ...

  2. 【worker】js中的多线程

    因为下个项目中要用到一些倒计时的功能,所以就提前准备了一下,省的到时候出现一下界面不友好和一些其他的事情.正好趁着这个机会也加深一下html5中的多线程worker的用法和理解. Worker简介 J ...

  3. 第13章 C#中的多线程

    第13章多线程 13.1 线程概述 计算机的操作系统多采用多任务和分时设计.多任务是指在一个操作系统中开以同时运行多个程序.例如,可以在使用QQ聊天的同时听音乐,即有多个独立的任务,每个任务对应一个进 ...

  4. C#中的多线程 - 同步基础

    C#中的多线程 - 同步基础 C#中的多线程 - 同步基础 1同步概要 在第 1 部分:基础知识中,我们描述了如何在线程上启动任务.配置线程以及双向传递数据.同时也说明了局部变量对于线程来说是私有的, ...

  5. C#中的多线程 - 并行编程 z

    原文:http://www.albahari.com/threading/part5.aspx 专题:C#中的多线程 1并行编程Permalink 在这一部分,我们讨论 Framework 4.0 加 ...

  6. Java的并发编程中的多线程问题到底是怎么回事儿?

    转载自   Java的并发编程中的多线程问题到底是怎么回事儿? 在我之前的一篇<再有人问你Java内存模型是什么,就把这篇文章发给他.>文章中,介绍了Java内存模型,通过这篇文章,大家应 ...

  7. udp怎么保证不丢包_在 Flink 算子中使用多线程如何保证不丢数据?

    分析痛点 笔者线上有一个 Flink 任务消费 Kafka 数据,将数据转换后,在 Flink 的 Sink 算子内部调用第三方 api 将数据上报到第三方的数据分析平台.这里使用批量同步 api,即 ...

  8. 线程中如何使用对象_在 Flink 算子中使用多线程如何保证不丢数据?

    简介: 本人通过分析痛点.同步批量请求优化为异步请求.多线程 Client 模式.Flink 算子内多线程实现以及总结四部分帮助大家理解 Flink 中使用多线程的优化及在 Flink 算子中使用多线 ...

  9. c语言中锁的作用,C语言中的多线程死锁

    我是C的新手,在下面的多线程场景中,N个线程从一个二进制文件中读取,并写入自己的单独文件,例如线程1写入文件1,线程2写入文件2,等等. 这对~2/3个线程有效,但对于其他线程,它似乎陷入了死锁,但我 ...

最新文章

  1. MySQL安装时MySQL server一直安装失败日志显示This application requires Visual Studio 2013 Redistributable...
  2. linux安装jdk、tomcat
  3. BizTalk 10034 错误
  4. 应用程序池优化配置方案(IIS7、IIS7.5)
  5. python os.getpidos.getppid
  6. postgresql基本使用(一)
  7. java抓取动态生成的网页
  8. 系统对接方案_一个呼叫中心系统组建的案例
  9. java中遍历Map几种方法
  10. 大脚导入配置选择哪个文件_「干货」图解 IntelliJ IDEA 最常用配置,适合新手
  11. Guri团队的侧信道攻击研究
  12. 即将前往下一个饭局,你的牙还好吗?丨钛空舱爆款春节特献
  13. request_threaded_irq()参数
  14. StoryBoard 创建tabBarController
  15. python小白学习第三节
  16. 华为在全球500强排名跃升,因拥中国市场
  17. 国内研发团队普遍常见问题
  18. 从带宏密码保护的Excel文件中导出VBA代码和Sheet
  19. android3d动画的实现,5秒让你的View变3D,ThreeDLayout使用和实现
  20. 曾国藩:人最大的差距,是见识和格局

热门文章

  1. 利用SPSS做数据分析②之数据处理2
  2. C++中 itoa 和 atoi 的用法
  3. freemarker遍历list处理第一个、最后一个元素
  4. SS00027.algorithm——|ArithmeticMachine.v27|——|Machine:项目实战.v04|竞争分析|
  5. eNSP华为模拟器安装
  6. 小程序 制作自定义弹层 添加弹层显示和隐藏动画 父组件与子组件(自定义组件)之间传值
  7. Altium Designer 如何从已有的PCB图、原理图,分别导出PCB封装库和原理图封装库
  8. 事务四大特性(ACID)原子性、一致性、隔离性、持久性
  9. 或是独体字吗_独体字结构 独体结构的字有哪些字?
  10. 制作一个属于自己的BHO吧!(C#)