作者 | 大道方圆

来源 | cnblogs.com/xdecode/p/9137793.html

本文主要讲解几种常见并行模式, 具体目录结构如下图.

单例

单例是最常见的一种设计模式, 一般用于全局对象管理, 比如xml配置读写之类的.

一般分为懒汉式, 饿汉式.

懒汉式: 方法上加synchronized

public static synchronized Singleton getInstance() {  if (single == null) {    single = new Singleton();  }    return single;
}

这种方式, 由于每次获取示例都要获取锁, 不推荐使用, 性能较差

懒汉式: 使用双检锁 + volatile

private volatile Singleton singleton = null;public static Singleton getInstance() {if (singleton == null) {synchronized (Singleton.class) {if (singleton == null) {singleton = new Singleton();}}}return singleton;}

本方式是对直接在方法上加锁的一个优化, 好处在于只有第一次初始化获取了锁.

后续调用getInstance已经是无锁状态. 只是写法上稍微繁琐点.

至于为什么要volatile关键字, 主要涉及到jdk指令重排, 详见之前的博文: Java内存模型与指令重排

懒汉式: 使用静态内部类

public class Singleton {private static class LazyHolder {private static final Singleton INSTANCE = new Singleton();}private Singleton (){}public static final Singleton getInstance() {return LazyHolder.INSTANCE;}
}

该方式既解决了同步问题, 也解决了写法繁琐问题. 推荐使用改写法.关注微信公众号:互联网架构师,在后台回复:2T,可以获取我整理的教程,都是干货。

缺点在于无法响应事件来重新初始化INSTANCE.

饿汉式

public class Singleton1 {private Singleton1() {}private static final Singleton1 single = new Singleton1();public static Singleton1 getInstance() {return single;}
}

缺点在于对象在一开始就直接初始化了.

Future模式

该模式的核心思想是异步调用. 有点类似于异步的ajax请求.

当调用某个方法时, 可能该方法耗时较久, 而在主函数中也不急于立刻获取结果.

因此可以让调用者立刻返回一个凭证, 该方法放到另外线程执行,后续主函数拿凭证再去获取方法的执行结果即可, 其结构图如下

jdk中内置了Future模式的支持, 其接口如下:

通过FutureTask实现

注意其中两个耗时操作.

  • 如果doOtherThing耗时2s, 则整个函数耗时2s左右.

  • 如果doOtherThing耗时0.2s, 则整个函数耗时取决于RealData.costTime, 即1s左右结束.

public class FutureDemo1 {public static void main(String[] args) throws InterruptedException, ExecutionException {FutureTask<String> future = new FutureTask<String>(new Callable<String>() {@Overridepublic String call() throws Exception {return new RealData().costTime();}});ExecutorService service = Executors.newCachedThreadPool();service.submit(future);System.out.println("RealData方法调用完毕");// 模拟主函数中其他耗时操作doOtherThing();// 获取RealData方法的结果System.out.println(future.get());}private static void doOtherThing() throws InterruptedException {Thread.sleep(2000L);}
}class RealData {public String costTime() {try {// 模拟RealData耗时操作Thread.sleep(1000L);return "result";} catch (InterruptedException e) {e.printStackTrace();}return "exception";}}

通过Future实现

与上述FutureTask不同的是, RealData需要实现Callable接口.关注微信公众号:互联网架构师,在后台回复:2T,可以获取我整理的教程,都是干货。

public class FutureDemo2 {public static void main(String[] args) throws InterruptedException, ExecutionException {ExecutorService service = Executors.newCachedThreadPool();Future<String> future = service.submit(new RealData2());System.out.println("RealData2方法调用完毕");// 模拟主函数中其他耗时操作doOtherThing();// 获取RealData2方法的结果System.out.println(future.get());}private static void doOtherThing() throws InterruptedException {Thread.sleep(2000L);}
}class RealData2 implements Callable<String>{public String costTime() {try {// 模拟RealData耗时操作Thread.sleep(1000L);return "result";} catch (InterruptedException e) {e.printStackTrace();}return "exception";}@Overridepublic String call() throws Exception {return costTime();}
}

另外Future本身还提供了一些额外的简单控制功能, 其API如下

// 取消任务boolean cancel(boolean mayInterruptIfRunning);// 是否已经取消boolean isCancelled();// 是否已经完成boolean isDone();// 取得返回对象V get() throws InterruptedException, ExecutionException;// 取得返回对象, 并可以设置超时时间V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;

生产消费者模式

生产者-消费者模式是一个经典的多线程设计模式. 它为多线程间的协作提供了良好的解决方案。

在生产者-消费者模式中,通常由两类线程,即若干个生产者线程和若干个消费者线程。关注微信公众号:互联网架构师,在后台回复:2T,可以获取我整理的教程,都是干货。

生产者线程负责提交用户请求,消费者线程则负责具体处理生产者提交的任务。

生产者和消费者之间则通过共享内存缓冲区进行通信, 其结构图如下

PCData为我们需要处理的元数据模型, 生产者构建PCData, 并放入缓冲队列.

消费者从缓冲队列中获取数据, 并执行计算.

生产者核心代码

while(isRunning) {Thread.sleep(r.nextInt(SLEEP_TIME));data = new PCData(count.incrementAndGet);// 构造任务数据System.out.println(data + " is put into queue");if (!queue.offer(data, 2, TimeUnit.SECONDS)) {// 将数据放入队列缓冲区中System.out.println("faild to put data : " + data);}}

消费者核心代码

while (true) {PCData data = queue.take();// 提取任务if (data != null) {// 获取数据, 执行计算操作int re = data.getData() * 10;System.out.println("after cal, value is : " + re);Thread.sleep(r.nextInt(SLEEP_TIME));}}

生产消费者模式可以有效对数据解耦, 优化系统结构.

降低生产者和消费者线程相互之间的依赖与性能要求.

一般使用BlockingQueue作为数据缓冲队列, 他是通过锁和阻塞来实现数据之间的同步,

如果对缓冲队列有性能要求, 则可以使用基于CAS无锁设计的ConcurrentLinkedQueue.

分而治之

严格来讲, 分而治之不算一种模式, 而是一种思想.

它可以将一个大任务拆解为若干个小任务并行执行, 提高系统吞吐量.

我们主要讲两个场景, Master-Worker模式, ForkJoin线程池.

Master-Worker模式

该模式核心思想是系统由两类进行协助工作: Master进程, Worker进程.

Master负责接收与分配任务, Worker负责处理任务. 当各个Worker处理完成后,

将结果返回给Master进行归纳与总结.

假设一个场景, 需要计算100个任务, 并对结果求和, Master持有10个子进程.

Master代码

public class MasterDemo {// 盛装任务的集合private ConcurrentLinkedQueue<TaskDemo> workQueue = new ConcurrentLinkedQueue<TaskDemo>();// 所有workerprivate HashMap<String, Thread> workers = new HashMap<>();// 每一个worker并行执行任务的结果private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<>();public MasterDemo(WorkerDemo worker, int workerCount) {// 每个worker对象都需要持有queue的引用, 用于领任务与提交结果worker.setResultMap(resultMap);worker.setWorkQueue(workQueue);for (int i = 0; i < workerCount; i++) {workers.put("子节点: " + i, new Thread(worker));}}// 提交任务public void submit(TaskDemo task) {workQueue.add(task);}// 启动所有的子任务public void execute(){for (Map.Entry<String, Thread> entry : workers.entrySet()) {entry.getValue().start();}}// 判断所有的任务是否执行结束public boolean isComplete() {for (Map.Entry<String, Thread> entry : workers.entrySet()) {if (entry.getValue().getState() != Thread.State.TERMINATED) {return false;}}return true;}// 获取最终汇总的结果public int getResult() {int result = 0;for (Map.Entry<String, Object> entry : resultMap.entrySet()) {result += Integer.parseInt(entry.getValue().toString());}return result;}}

Worker代码

public class WorkerDemo implements Runnable{private ConcurrentLinkedQueue<TaskDemo> workQueue;private ConcurrentHashMap<String, Object> resultMap;@Overridepublic void run() {while (true) {TaskDemo input = this.workQueue.poll();// 所有任务已经执行完毕if (input == null) {break;}// 模拟对task进行处理, 返回结果int result = input.getPrice();this.resultMap.put(input.getId() + "", result);System.out.println("任务执行完毕, 当前线程: " + Thread.currentThread().getName());}}public ConcurrentLinkedQueue<TaskDemo> getWorkQueue() {return workQueue;}public void setWorkQueue(ConcurrentLinkedQueue<TaskDemo> workQueue) {this.workQueue = workQueue;}public ConcurrentHashMap<String, Object> getResultMap() {return resultMap;}public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {this.resultMap = resultMap;}
}
public class TaskDemo {private int id;private String name;private int price;public int getId() {return id;}public void setId(int id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public int getPrice() {return price;}public void setPrice(int price) {this.price = price;}
}

主函数测试

MasterDemo master = new MasterDemo(new WorkerDemo(), 10);for (int i = 0; i < 100; i++) {TaskDemo task = new TaskDemo();task.setId(i);task.setName("任务" + i);task.setPrice(new Random().nextInt(10000));master.submit(task);}master.execute();while (true) {if (master.isComplete()) {System.out.println("执行的结果为: " + master.getResult());break;}}

ForkJoin线程池

该线程池是jdk7之后引入的一个并行执行任务的框架, 其核心思想也是将任务分割为子任务,

有可能子任务还是很大, 还需要进一步拆解, 最终得到足够小的任务.

将分割出来的子任务放入双端队列中, 然后几个启动线程从双端队列中获取任务执行.

子任务执行的结果放到一个队列里, 另起线程从队列中获取数据, 合并结果.

假设我们的场景需要计算从0到20000000L的累加求和. CountTask继承自RecursiveTask, 可以携带返回值.

每次分解大任务, 简单的将任务划分为100个等规模的小任务, 并使用fork()提交子任务.

在子任务中通过THRESHOLD设置子任务分解的阈值, 如果当前需要求和的总数大于THRESHOLD, 则子任务需要再次分解,如果子任务可以直接执行, 则进行求和操作, 返回结果. 最终等待所有的子任务执行完毕, 对所有结果求和.

public class CountTask extends RecursiveTask<Long>{// 任务分解的阈值private static final int THRESHOLD = 10000;private long start;private long end;public CountTask(long start, long end) {this.start = start;this.end = end;}public Long compute() {long sum = 0;boolean canCompute = (end - start) < THRESHOLD;if (canCompute) {for (long i = start; i <= end; i++) {sum += i;}} else {// 分成100个小任务long step = (start + end) / 100;ArrayList<CountTask> subTasks = new ArrayList<CountTask>();long pos = start;for (int i = 0; i < 100; i++) {long lastOne = pos + step;if (lastOne > end) {lastOne = end;}CountTask subTask = new CountTask(pos, lastOne);pos += step + 1;// 将子任务推向线程池subTasks.add(subTask);subTask.fork();}for (CountTask task : subTasks) {// 对结果进行joinsum += task.join();}}return sum;}public static void main(String[] args) throws ExecutionException, InterruptedException {ForkJoinPool pool = new ForkJoinPool();// 累加求和 0 -> 20000000LCountTask task = new CountTask(0, 20000000L);ForkJoinTask<Long> result = pool.submit(task);System.out.println("sum result : " + result.get());}
}

ForkJoin线程池使用一个无锁的栈来管理空闲线程, 如果一个工作线程暂时取不到可用的任务, 则可能被挂起.

挂起的线程将被压入由线程池维护的栈中, 待将来有任务可用时, 再从栈中唤醒这些线程.

关微信公众号:互联网架构师,在后台回复:2T,可以获取我整理的教程,都是干货。

猜你喜欢

1、GitHub 标星 3.2w!史上最全技术人员面试手册!FackBoo发起和总结

2、如何才能成为优秀的架构师?

3、从零开始搭建创业公司后台技术栈

4、程序员一般可以从什么平台接私活?

5、37岁程序员被裁,120天没找到工作,无奈去小公司,结果懵了...

6、滴滴业务中台构建实践,首次曝光

7、不认命,从10年流水线工人,到谷歌上班的程序媛,一位湖南妹子的励志故事

8、15张图看懂瞎忙和高效的区别

9、2T架构师学习资料干货分享

Java高并发之设计模式,设计思想!相关推荐

  1. Java高并发之设计模式,设计思想

    点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试文章 作者:大道方圆 cnblogs.com/xdecode/p/913 ...

  2. 深入Java线程池:从设计思想到源码解读

    点击关注公众号,实用技术文章及时了解 初识线程池 我们知道,线程的创建和销毁都需要映射到操作系统,因此其代价是比较高昂的.出于避免频繁创建.销毁线程以及方便线程管理的需要,线程池应运而生. 线程池优势 ...

  3. Java高并发之Hosee博客内容整理

    Hosee博客博客高并发系列目录 [高并发Java 一] 前言 [高并发Java 二] 多线程基础 [高并发Java 三] Java内存模型和线程安全 [高并发Java 四] 无锁 [高并发Java ...

  4. Java高并发之锁优化

    本文主要讲并行优化的几种方式, 其结构如下: 锁优化 减少锁的持有时间 例如避免给整个方法加锁 1 public synchronized void syncMethod(){ 2 othercode ...

  5. java高并发之魂:Synchronize

    synchronize在高并发领域可谓是元老级别了,博主最近整理了一下synchronize的用法,和简单的概念. 概念 对象锁:包括 方法锁(默认锁对象为this当前实例对象)和同步代码块锁(自己指 ...

  6. Java高并发之BlockingQueue

    前言碎语 当系统流量负载比较高时,业务日志的写入操作也要纳入系统性能考量之内,如若处理不当,将影响系统的正常业务操作,之前写过一篇<spring boot通过MQ消费log4j2的日志>的 ...

  7. Java高并发之魂:synchronized深度解析

    本文整理自慕课网的讲师悟空老师,教学地址:http://www.imooc.com/learn/1086 文章目录 一.synchronized简介 1 synchronized作用 1.1 官方翻译 ...

  8. java消息分发_Kafka教程设计思想之消息分发策略

    现在我们对于 producer 和 consumer 的工作原理已将有了一点了解,让我们接着讨论 Kafka 在 producer 和 consumer 之间提供的语义保证.显然,Kafka可以提供的 ...

  9. Java高并发之CountDownLatch源码分析

    概述 CountDownLatch 允许一个或多个线程等待直到在其他线程中执行的一组操作完成的同步辅助.简单来说,就是 CountDownLatch 内部维护了一个计数器,每个线程完成自己的操作之后都 ...

  10. java 高并发_Java 高并发之无锁(CAS)

    Java 高并发之无锁(CAS) 本篇主要讲 Java中的无锁 CAS ,无锁 顾名思义就是 以不上锁的方式解决并发问题,而不使用synchronized 和 lock 等.. 1. Atomic 包 ...

最新文章

  1. 车辆密度估计--Understanding Traffic Density from Large-Scale Web Camera Data
  2. python使用kafka原理详解真实完整版_转:Kafka史上最详细原理总结 ----看完绝对不后悔...
  3. javascript运动的小框架
  4. 鸿蒙os2.0通知栏,网友上手鸿蒙手机OS 2.0公测版:界面与EMUI已有明显不同
  5. 阿里云OSS Multipart Upload上传实例
  6. 记录一次IDEA开发JavaWeb项目时JS中文乱码排错方法
  7. usb 系统消息_小米USB3.0分线器发布:四口USB 3.0+USB-C
  8. java ajaxsubmit_jQuery 使用 ajaxSubmit() 提交表单实现方法
  9. python sdk是什么意思_什么是 SDK?
  10. 51nod1538: 一道难题(特征多项式+多项式取模/求逆)
  11. 另类方法申请google adsense账号
  12. 自己动手撸一个Jlink-TinyJlink诞生记
  13. 计算机网络中请求超时是什么意思,请求超时什么意思
  14. 海乐淘商城系统--01前缀(功能介绍以及关于架构)
  15. 电路设计_光耦的主要参数
  16. 大厂对软件测试的误解这么深吗?测试岗会越来越少吗?该怎样提升技术?
  17. 数据科学系列:plotly可视化入门介绍
  18. 在IDEA中使用JUnit4和JUnitGenerator V2.0自动生成测试模块
  19. Eclipse如何开发Android?
  20. 度盘满速直接下载无需登录直接下载。

热门文章

  1. ThinkPHP5.0之控制器中常用操作
  2. 如果奇迹有颜色,那么一定是暴力or模拟比较6
  3. Arrays.asList的用法
  4. Nginx负载均衡服务器实现会话粘贴的几种方式
  5. 全局 HOOK 研究
  6. .Net中的数字和日期格式化规则助记词
  7. Mac查看本机公网IP
  8. 安装Homebrew是报错,安装不成功(亲测使用,确实解决了问题)
  9. iOS开发判断字符串为null
  10. 排版设计软件QuarkXPress 2022 for mac