生产者消费者模式介绍

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

这个阻塞队列就是用来给生产者和消费者解耦的。阻塞队列如何实现高并发多线程安全也是生产者消费者模式中的核心关键。

在日常开发过程中,我们常常会遇到一些高并发场景,例如很多秒杀场景,其实真实的秒杀场景会很复杂,这里只是简单描述下秒杀场景下的生产者消费者模式,在秒杀场景下生产者是普通参与秒杀的用户,消费者是秒杀系统,通常来说这样的场景下秒杀用户是非常多的,如果系统采用常规的实时同步交易,那么势必造成系统处理线程池被瞬间占满,后续请求全部被丢弃,这样造成的用户体验是非常差的,而且系统可能会出现快速雪崩。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。秒杀场景是个非常适合生产者、消费者的模式,通过中间的缓冲队列解决秒杀请求接收和秒杀处理两者之间的处理时间差,并且能够在过程中通过反欺诈和公平算法来保证消费者的公平利益。下面我们就来用java实现下生产者和消费者模式,这里实现的是可以直接用于生产环境的架构,并不是简单的使用Queue做个简单的进栈出栈,而是通过java.util.concurrent下的相关类实现生产者消费者模式的多线程方案。

Java实现生产者消费者模式

队列的特性:先进先出(FIFO)—先进入队列的元素先出队列(可以理解为我们生活中的排队情况,早办完,早滚蛋)。生产者(Producer)往队列里发布(publish)事件,消费者(Consumer)获得通知,消费事件;如果队列中没有事件时,消费者堵塞,直到生产者发布了新事件。

说到队列,那就不得不提到Java中的concurrent包,其主要实现包括ArrayBlockingQueue、LinkedBlockingQueue、ConcurrentLinkedQueue、LinkedTransferQueue。下面,简单介绍下:

  • ArrayBlockingQueue:基于数组形式的队列,通过加锁的方式,来保证多线程情况下数据的安全;

  • LinkedBlockingQueue:基于链表形式的队列,也通过加锁的方式,来保证多线程情况下数据的安全;

  • ConcurrentLinkedQueue:基于链表形式的队列,通过compare and swap(简称CAS)协议的方式,来保证多线程情况下数据的安全,不加锁,主要使用了Java中的sun.misc.Unsafe类来实现;

  • LinkedTransferQueue:同上;

因为LinkedBlockingQueue采用了乐观锁方案,所以性能是非常高的,下面我们就用LinkedBlockingQueue作为队列缓冲区来实现生产者消费者模式。

待处理数据类

首先我们需要实现一个缓冲队列中的待处理类,这里例子中实现的比较简单,只是设置了一个int类型的变量,重写了构造函数并定义了get方法,大家可以根据自己的需要定义相关的内容。

public class PCData {private final int intData;public PCData(int intData) {this.intData = intData;}public int getIntData() {return intData;}@Overridepublic String toString() {return "PCData{" +"intData=" + intData +'}';}
}

生产者类

下面我们定义生产者类,在生产者类中需要定义一个缓冲队列,这里使用了刚才提到的BlockingDeque。

private BlockingDeque<PCData> queue;

生产者中还需要再定义一个静态的AtomicInteger类型的对象,用于多线程中共享数据,用于生成PCData,为什么使用AtomicInteger类型,是因为AtomicInteger类型已经实现了线程安全的自增功能,在实际项目使用过程中,这个值可能是UUID或者其他的全局唯一的数值。

private static AtomicInteger count = new AtomicInteger();

还需要重写构造方法,在生成生产者的时候使用同一个缓冲队列,来保证生产者和开发者都使用一样的队列,在实际项目中也可以定一个全局的队列,来保证所有的生产者和消费者都使用同一个对列。

//定义入参为BlockingQueue的构造函数public Producer(BlockingDeque<PCData> queue){this.queue = queue;}

生产者的核心方法中主要实现了创建PCData类并将该待处理对象放入缓冲队列中,这里为了模拟处理耗时,sleep了1秒钟,所有继承子BlockingDeque的队列类都实现了offer方法,该方法主要是将待处理对象放入缓冲队列中,这样生产者就完成了生产者的基本工作,创建待处理类对象,并将其放入队列。

Thread.sleep(1000);
data = new PCData(count.incrementAndGet());
queue.offer(data);

下面是整个Producer的代码:

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;/*** @Author: feiweiwei* @Description: 生产者* @Created Date: 17:14 17/9/10.* @Modify by:*/
public class Producer implements Runnable {private volatile boolean isrunning = true;//内存缓冲队列private BlockingDeque<PCData> queue;private static AtomicInteger count = new AtomicInteger();//定义入参为BlockingQueue的构造函数public Producer(BlockingDeque<PCData> queue){this.queue = queue;}public void stop(){this.isrunning = false;}@Overridepublic void run() {PCData data = null;System.out.println("producer id = " + Thread.currentThread().getId());while (isrunning) {try {Thread.sleep(1000);data = new PCData(count.incrementAndGet());queue.offer(data);} catch (InterruptedException e) {e.printStackTrace();}}}
}

消费者类

消费者类的核心工作就是将待处理数据从缓冲队列中取出,并处理。

在消费者类中同样有个BlockingDeque<PCData>的对象,同样也是在创建消费者类的时候从外部传入,这样可以保证所有生产者和消费者使用一样的队列。

在核心处理逻辑中通过BlockingDeque的take方法取出待处理对象,然后就可以对该对象进行处理了,调用take方法后,该待处理对象也自动从queue中弹出。

下面是消费者实现代码:

import java.util.concurrent.BlockingDeque;/*** @Author: feiweiwei* @Description: 消费者类* @Created Date: 17:26 17/9/10.* @Modify by:*/
public class Customer implements Runnable {private BlockingDeque<PCData> queue;private volatile boolean isrunning = true;//定义入参为BlockingQueue的构造函数public Customer(BlockingDeque<PCData> queue){this.queue = queue;}public void stop(){this.isrunning = false;}@Overridepublic void run() {System.out.println("customer id = " + Thread.currentThread().getId());while (isrunning){try {PCData data = queue.take();if ( null != data){int re = data.getIntData() * data.getIntData();Thread.sleep(1000);System.out.println(Thread.currentThread().getId() + " data is " + re + "done!");}} catch (InterruptedException e) {e.printStackTrace();}}}
}

主调用Main

在主调用Main中,我们先创建一个队列长度为10的LinkedBlockingDeque对象作为缓冲队列。

BlockingDeque<PCData> queue = new LinkedBlockingDeque<PCData>(10);

再分别创建10个生产者对象和2个消费者,并将刚才创建的queue对象作为构造函数入参。

Producer[] producers = new Producer[10];
Customer[] customers = new Customer[2];for(int i=0; i<10; i++){producers[i] = new Producer(queue);
}
for(int j=0; j<2; j++){customers[j] = new Customer(queue);
}

创建一个线程池将生产者和消费者调用起来,这里的线程池大家可以使用自定义的线程池。

ExecutorService es = Executors.newCachedThreadPool();
for(Producer producer : producers){es.execute(producer);
}for(Customer customer : customers){es.execute(customer);
}

下面是Main代码:

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;/*** @Author: feiweiwei* @Description:* @Created Date: 17:29 17/9/10.* @Modify by:*/
public class Main {public static void main(String[] args) throws InterruptedException {BlockingDeque<PCData> queue = new LinkedBlockingDeque<PCData>(10);Producer[] producers = new Producer[10];Customer[] customers = new Customer[2];for(int i=0; i<10; i++){producers[i] = new Producer(queue);}for(int j=0; j<2; j++){customers[j] = new Customer(queue);}ExecutorService es = Executors.newCachedThreadPool();for(Producer producer : producers){es.execute(producer);}for(Customer customer : customers){es.execute(customer);}Thread.sleep(10000);for(Producer producer : producers){producer.stop();}for(Customer customer : customers){customer.stop();}es.shutdown();}
}

Disruptor实现生产者消费者模式

刚才那个是我们自己使用java.util.concurrent下的类实现的生产者消费者模式,目前业界已经有比较成熟的方案,这里向大家推荐LMAX公司开源的Disruptor框架,Disruptor是一个开源的框架,可以在无锁的情况下对队列进行操作,那么这个队列的设计就是Disruptor的核心所在。

在Disruptor中,采用了RingBuffer来作为队列的数据结构,RingBuffer就是一个环形的数组,既然是数组,我们便可对其设置大小。在这个ringBuffer中,除了数组之外,还有一个序列号,是用来指向数组中的下一个可用元素,供生产者使用或者消费者使用,也就是生产者可以生产的地方,或者消费者可以消费的地方。在Disruptor中使用的是位运算,并且在Disruptor中数组内的元素并不会被删除,而是新数据来覆盖原有数据,所以整个环链的处理效率非常高。

下面我们使用Disruptor来实现刚才用jdk自带库实现的生产者消费者。

Disruptor主要类

  • Disruptor:Disruptor的入口,主要封装了环形队列RingBuffer、消费者集合ConsumerRepository的引用;主要提供了获取环形队列、添加消费者、生产者向RingBuffer中添加事件(可以理解为生产者生产数据)的操作;

  • RingBuffer:Disruptor中队列具体的实现,底层封装了Object[]数组;在初始化时,会使用Event事件对数组进行填充,填充的大小就是bufferSize设置的值;此外,该对象内部还维护了Sequencer(序列生产器)具体的实现;

  • Sequencer:序列生产器,分别有MultiProducerSequencer(多生产者序列生产器) 和 SingleProducerSequencer(单生产者序列生产器)两个实现类。上面的例子中,使用的是SingleProducerSequencer;在Sequencer中,维护了消费者的Sequence(序列对象)和生产者自己的Sequence(序列对象);以及维护了生产者与消费者序列冲突时候的等待策略WaitStrategy;

  • Sequence:序列对象,内部维护了一个long型的value,这个序列指向了RingBuffer中Object[]数组具体的角标。生产者和消费者各自维护自己的Sequence;但都是指向RingBuffer的Object[]数组;

  • Wait Strategy:等待策略。当没有可消费的事件时,消费者根据特定的策略进行等待;当没有可生产的地方时,生产者根据特定的策略进行等待;

  • Event:事件对象,就是我们Ringbuffer中存在的数据,在Disruptor中用Event来定义数据,并不存在Event类,它只是一个定义;

  • EventProcessor:事件处理器,单独在一个线程内执行,判断消费者的序列和生产者序列关系,决定是否调用我们自定义的事件处理器,也就是是否可以进行消费;

  • EventHandler:事件处理器,由用户自定义实现,也就是最终的事件消费者,需要实现EventHandler接口;

  • Producer:事件生产者,也就是我们上面代码中最后那部门的for循环;

待处理类

Disruptor的待处理类和自己实现的待处理类没有本质的区别,可以按照自己要求进行定义。

public class PCData {private int data;public int getData() {return data;}public void setData(int data) {this.data = data;}
}

待处理类工厂

这里需要实现disruptor的EventFactory接口,并且实现newInstance方法。这里我们实现的newInstance方法,其实就是创建待处理类的对象,该工厂类在创建Disruptor对象的时候会使用到。

import com.lmax.disruptor.EventFactory;/*** @Author: feiweiwei* @Description: 待处理类工厂* @Created Date: 18:55 17/9/10.* @Modify by:*/
public class PCDataFactory implements EventFactory<PCData> {@Overridepublic PCData newInstance() {return new PCData();}
}

disruptor生产者类

同样需要在生产者中定义一个RingBuffer<PCData>的环形队列,还需要实现一个push的方法,通过ringBuffer.next()取到下一个待处理类序列号,使用ringBuffer.get(sequence)获取到这个序列号对应的待处理类,并对待处理类进行赋值为新的待处理类。
最后通过ringBuffer.publish(sequence)才会将待处理对象发布出来,消费者才能看到。

import com.lmax.disruptor.RingBuffer;/*** @Author: feiweiwei* @Description: disruptor生产者类* @Created Date: 18:56 17/9/10.* @Modify by:*/
public class Producer {private final RingBuffer<PCData> ringBuffer;public Producer(RingBuffer<PCData> ringBuffer) {this.ringBuffer = ringBuffer;}public void pushData(int data){long sequence = ringBuffer.next();try{PCData event = ringBuffer.get(sequence);event.setData(data);}finally {ringBuffer.publish(sequence);}}
}

disruptor消费者

disruptor的消费者类需要实现WorkHandler接口,并实现onEvent方法来处理待处理类,例子中只是对待处理类中的值做了平方。

import com.lmax.disruptor.WorkHandler;/*** @Author: feiweiwei* @Description: disruptor消费者* @Created Date: 18:52 17/9/10.* @Modify by:*/
public class Consumer implements WorkHandler<PCData> {@Overridepublic void onEvent(PCData pcData) throws Exception {System.out.println(Thread.currentThread().getId() +"Event = " + pcData.getData()*pcData.getData());}
}

Main

待处理类、待处理工厂、生产者、消费者都定义好之后就可以进行使用了,定义一个缓行队列为1024的disruptor对象,这里构造函数入参看名字就知道了,很简单。

PCDataFactory factory = new PCDataFactory();
int bufferSize = 1024;
Disruptor<PCData> disruptor = new Disruptor<PCData>(factory,bufferSize,executor,
ProducerType.MULTI,new BlockingWaitStrategy());

给disruptor对象定义消费者,这里就简单定义两个consumer作为生产者。

disruptor.handleEventsWithWorkerPool(new Consumer(),new Consumer());

初始化Producer并且将ringBuffer作为构造函数入参,并通过生产者循环100次将数据push入队列,消费者会自动从队列取值进行处理。

RingBuffer<PCData> ringBuffer = disruptor.getRingBuffer();
Producer producer = new Producer(ringBuffer);for(int i=0; i<100; i++){producer.pushData(i);Thread.sleep(100);System.out.println("push data " + i);
}

以下为Main全部代码:

package com.monkey01.producercustomer.disruptor;import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;/*** @Author: feiweiwei* @Description:* @Created Date: 18:59 17/9/10.* @Modify by:*/
public class Main {public static void main(String args[]) throws InterruptedException {Executor executor = Executors.newCachedThreadPool();PCDataFactory factory = new PCDataFactory();int bufferSize = 1024;Disruptor<PCData> disruptor = new Disruptor<PCData>(factory,bufferSize,executor,ProducerType.MULTI,new BlockingWaitStrategy());disruptor.handleEventsWithWorkerPool(new Consumer(),new Consumer());disruptor.start();RingBuffer<PCData> ringBuffer = disruptor.getRingBuffer();Producer producer = new Producer(ringBuffer);for(int i=0; i<100; i++){producer.pushData(i);Thread.sleep(100);System.out.println("push data " + i);}disruptor.shutdown();}
}

总结

大家看到这里也基本对生产者、消费者模式有个比较深入的了解了,也可以按照文中的例子,在自己的项目中使用,这个模式在日常项目中还是比较常见的,希望大家能够熟练使用该模式。

生产者消费者模式-java原生、Disruptor实现方案相关推荐

  1. 生产者消费者模式Java实现

    package com.szc.java;/*** @Author szc* @Date 2021/8/24*/ //缓冲区(售货员) class Clerk {private int quantit ...

  2. java消费者模式_基于Java 生产者消费者模式(详细分析)

    生产者消费者模式是多线程中最为常见的模式:生产者线程(一个或多个)生成面包放进篮子里(集合或数组),同时,消费者线程(一个或多个)从篮子里(集合或数组)取出面包消耗.虽然它们任务不同,但处理的资源是相 ...

  3. Java并发编程实战~生产者-消费者模式

    前面我们在<Worker Thread 模式>中讲到,Worker Thread 模式类比的是工厂里车间工人的工作模式.但其实在现实世界,工厂里还有一种流水线的工作模式,类比到编程领域,就 ...

  4. java 消费者模式 多线程_[Java并发-24-并发设计模式] 生产者-消费者模式,并发提高效率...

    生产者 - 消费者模式在编程领域的应用非常广泛,前面我们曾经提到,Java 线程池本质上就是用生产者 - 消费者模式实现的,所以每当使用线程池的时候,其实就是在应用生产者 - 消费者模式. 当然,除了 ...

  5. java生产线消费者,基于Java 生产者消费者模式(详细分析)

    生产者消费者模式是多线程中最为常见的模式:生产者线程(一个或多个)生成面包放进篮子里(集合或数组),同时,消费者线程(一个或多个)从篮子里(集合或数组)取出面包消耗.虽然它们任务不同,但处理的资源是相 ...

  6. java lock condition_Java 通过 Lock 和 竞争条件 Condition 实现生产者消费者模式

    更多 Java 并发编程方面的文章,请参见文集<Java 并发编程> 竞争条件 多个线程共享对某些变量的访问,其最后结果取决于哪个线程偶然在竞争中获胜. condition.await() ...

  7. java消费者生产者设计模式_java 多线程并发设计模式之四: 生产者消费者模式

    生产者消费者模式是一个经典的多线程设计模式,其核心思想是:有两类线程和一个内存缓冲区或者队列, 一类线程发起任务,并提交到队列中.另一类线程用来处理这些任务,叫做消费者线程. 这两类线程进行通信的桥梁 ...

  8. 菜鸟学习笔记:Java提升篇8(线程2——线程的基本信息、线程安全、死锁、生产者消费者模式、任务调度)

    菜鸟学习笔记:Java提升篇8(线程2--线程的基本信息.线程安全.死锁.生产者消费者模式.任务调度) 线程的基本信息 线程同步 线程安全 死锁 生产者消费者模式 任务调度(了解) 线程的基本信息 J ...

  9. java consumed_Java设计模式—生产者消费者模式(阻塞队列实现)

    生产者消费者模式是并发.多线程编程中经典的 真实世界中的生产者消费者模式 生产者和消费者模式在生活当中随处可见,它描述的是协调与协作的关系.比如一个人正在准备食物(生产者),而另一个人正在吃(消费者) ...

最新文章

  1. 开源硬件:极客们的伟大理想
  2. python udp_如何用python方法检测UDP端口
  3. 优雅地记录Python程序日志2:模块组件化日志记录器
  4. linux桌面xfce美化_Xfce Linux桌面环境,Arduino机器人,热门DevOps书籍,FreeDOS,Python,Go等
  5. php mysql查询出来二叉树的数据_tp框架怎么实现二叉树查询 如图,查询数据库中小明下面的所有人。到底下面多少人,不清楚。 代码如何实现...
  6. SAP License:什么是SAP ECC?与WMS系统集成技术要点
  7. java 远程连接_java实现连接远程服务器并执行命令的基本原理
  8. 存储器和 I/O 端口有哪两种编址方式?简要说明各自特点
  9. 医疗健康领域的短文本解析探索 ----文本纠错
  10. chrome浏览器js 导出excel
  11. html 默认ie设置,如何设置ie为默认浏览器,教您如何设置ie为默认浏览器
  12. lwj_C#_作业 ListT应用
  13. 符号三角形问题 java_算法java实现--回溯法--符号三角形问题
  14. Chrome浏览器占用CPU资源过高(Software Reporter Tool)
  15. 手工脱壳之 ASPack压缩壳【随机基址】【重定位表加密】
  16. leet code: Two Sum
  17. 福建农林大学计算机分数线,福建农林大学录取分数线2021是多少分(附历年录取分数线)...
  18. android x86评测,异于Win8新体验 x86版Android4.0解析
  19. div内容文字自适应
  20. [渝粤教育] 广东-国家-开放大学 21秋期末考试应急管理10413k2

热门文章

  1. Redis基础(三)——数据类型
  2. POJ 1741 Tree(树的点分治)
  3. java---解析XML文件,通过反射动态将XML内容封装到一个类中
  4. python接口自动化(三十二)--Python发送邮件(常见四种邮件内容)番外篇——上
  5. python创建虚拟环境报错typeerror_python 创建虚拟环境时报错OSError, setuptools下载失败...
  6. 三菱伺服自动调谐_自动化领域最值得关注的十大伺服电机
  7. 云服务 华为p10 短信_苹果、小米、华为,手机云服务哪家强?
  8. SpringBoot项目启动时控制台乱码,怎么办?
  9. layer.open回显数据select选择默认值
  10. 读取XML文件报 - Content is not allowed in prolog