生产者消费者模型分析

操作系统中的经典模型,由若干个消费者和生产者,消费者消耗系统资源,生产者创造系统资源,资源的数量要保持在一个合理范围(小于数量上限,大约0)。而消费者和生产者是通过并发或并行方式访问系统资源的,需要保持资源的原子操作。
其实就是生产者线程增加资源数,如果资源数大于最大值则生产者线程挂起等待,当收到消费者线程的通知后继续生产。
消费者线程减少资源数,如果资源数为0,则消费者线程挂起,等待生产者通知后继续生产。
将该模型提炼成伪代码如下:

func consume(){Lock()if count <= 0挂起等待(解锁,并等待资源数大于0)收到系统通知资源数大约0,抢占加锁count--如果当前资源数由最大值变少则通知生产者生产ULock()
}func produce(){Lock()if count >= 最大值挂起等待(解锁,并等待资源数小于最大值)收到系统通知资源小于最大值,抢占加锁count++如果当前资源数由最小值0增加则通知消费者可以消耗ULock()
}

  

consume()消耗资源,produce()生产资源,之前实现过C版本的该模型

http://www.limerence2017.com/2017/08/08/pthreadwait/
C方式实现的是抢占式的,线程切换开销较大。下面给出golang协程方式的实现。

先实现资源的互斥访问

对于资源的互斥访问,其他语言提供了线程锁,golang也有线程锁,当然可以通过channel实现,这里我给出加锁访问资源的方式,因为channel内部也是通过加锁实现的,而且我习惯用channel做协程通信,对于共享资源的控制习惯用锁来控制,也比较高效。

先定义几个全局变量

const (PRODUCER_MAX = 5CONSUMER_MAX = 2PRODUCT_MAX  = 20
)var productcount = 0
var lock sync.Mutex
var wgrp sync.WaitGroup

  

productcount为资源的数量,需要互斥处理。

wgrp主要是主协程用来等待其他协程退出。
PRODUCT_MAX 表示资源的上限,达到该值,生产者停止生产。
PRODUCER_MAX 表示生产者协程数量
CONSUMER_MAX 表示消费者协程数量
我们实现生产者代码

//生产者
func Produce(index int, wgrp *sync.WaitGroup) {defer func() {if err := recover(); err != nil {fmt.Println("Producer ", index, " panic")}wgrp.Done()}()for {time.Sleep(time.Second)lock.Lock()fmt.Println("Producer ", index, " begin produce")if productcount >= PRODUCT_MAX {fmt.Println("Products are full")lock.Unlock()return}productcount++fmt.Println("Products count is ", productcount)lock.Unlock()}
}

defer 的匿名函数主要是用来回收资源,不是重点

for循环内部生产者循环增加资源,为保证productcount的互斥访问,我们加了锁。
当productcount达到上限后解锁并返回,否则就增加数量,然后释放锁。
同样的道理我们实现了消费者

func Consume(index int, wgrp *sync.WaitGroup) {defer func() {if err := recover(); err != nil {fmt.Println("Consumer ", index, " panic")}wgrp.Done()}()for {time.Sleep(time.Second)lock.Lock()fmt.Println("Consumer ", index, " begin consume")if productcount <= 0 {fmt.Println("Products are empty")lock.Unlock()return}productcount--fmt.Println("Products count is ", productcount)lock.Unlock()}
}

  

消费者加锁减少productcount数量,当productcount为0,则解锁并返回。

然后我们实现主函数

func main() {wgrp.Add(PRODUCER_MAX + CONSUMER_MAX)for i := 0; i < PRODUCER_MAX; i++ {go Produce(i, &wgrp)}for i := 0; i < CONSUMER_MAX; i++ {go Consume(i, &wgrp)}wgrp.Wait()
}

我们创建了若干生产者和消费者,主协程通过wgrp等待其他协程退出。

我们看下效果

可以看出并发的访问实现了,但是并没有实现条件等待和控制,比如当数量上限后其他生产者也可以访问。
接下来我们实现的是当数量上限是生产者挂起等待,直到消费者通知其生产。数量为0时消费者挂起,
等待生产者激活。也就是条件等待和异步协同。

实现条件等待和异步协同

协程之间的同步和等待可以使用channel,我们增加了两个全局非缓冲channel

var produce_wait chan struct{}
var consume_wait chan struct{}

produce_wait 用来控制生产者阻塞等待

consume_wait 用来控制消费者阻塞等待

我们修改下生产者

//生产者
func Produce(index int, wgrp *sync.WaitGroup) {defer func() {if err := recover(); err != nil {fmt.Println("Producer ", index, " panic")}wgrp.Done()}()for {time.Sleep(time.Second)lock.Lock()fmt.Println("Producer ", index, " begin produce")if productcount >= PRODUCT_MAX {fmt.Println("Products are full")lock.Unlock()//产品满了,生产者wait<-produce_waitcontinue}lastcount := productcountproductcount++fmt.Println("Products count is ", productcount)lock.Unlock()//产品数由0到1,激活消费者if lastcount == 0 {var consumActive struct{}consume_wait <- consumActive}}
}

在18行增加了<-produce_wait,这样生产者会挂起,等待消费者向produce_wait写入,从而得到激活。

另外26行增加了判断,当资源数由0到1时,激活消费者。
同样消费者实现类似

//消费者
func Consume(index int, wgrp *sync.WaitGroup) {defer func() {if err := recover(); err != nil {fmt.Println("Consumer ", index, " panic")}wgrp.Done()}()for {time.Sleep(time.Second)lock.Lock()fmt.Println("Consumer ", index, " begin consume")if productcount <= 0 {fmt.Println("Products are empty")lock.Unlock()//产品空了,消费者等待<-consume_waitcontinue}lastcount := productcountproductcount--fmt.Println("Products count is ", productcount)lock.Unlock()//产品数由PRODUCT_MAX变少,激活生产者if lastcount == PRODUCT_MAX {var productActive struct{}produce_wait <- productActive}}
}

这里我们要有并发的思想,考虑这样一个场景,当前产品数达到上限,Produce运行完16行,刚刚解锁,还没来得及运行18行挂起,

Consume抢占到锁正常运行消耗资源,运行到28行,优先对produce_wait写入,此时该消费者挂起,生产者收到信号后,
他们都会继续执行。
我们完善下main函数

func main() {wgrp.Add(PRODUCER_MAX + CONSUMER_MAX)produce_wait = make(chan struct{})consume_wait = make(chan struct{})for i := 0; i < CONSUMER_MAX; i++ {go Consume(i, &wgrp)}for i := 0; i < PRODUCER_MAX; i++ {go Produce(i, &wgrp)}wgrp.Wait()
}

执行golang的锁检测并运行

go run -race main.go
可以看到是可以正常运行的

我们继续用并发思想分析,我们实现了基本功能,但是有个瑕疵,我们的生产者协程较多,比如生产者协程1判断生产上限在18行挂起,其他生产者如果抢占锁后进入生产判断数量上限,也会在18行挂起,由于我们的produce_wait是非缓冲的,那么当消费者来激活时,只有一个生产者被激活,另一个一直挂着,等到消费者激活才能继续生产。这么做在一定程度限制了生产者,我们可以通过引入两个bool变量通知其他协程睡眠,避免此问题。

增加bool变量实现休眠

我们可以引入两个bool变量

var stopProduce = false
var stopConsume = false

当资源达到上限或下限时,挂起单个协程,通过这两个变量休眠同类协程。

由于golang没有提供给我们休眠的api,我们就让同类型的协程sleep一会,这样也是可以提高模型并发的。
改进的生产者

//生产者
func Produce(index int, wgrp *sync.WaitGroup) {defer func() {if err := recover(); err != nil {fmt.Println("Producer ", index, " panic")}wgrp.Done()}()for {time.Sleep(time.Second)lock.Lock()if stopProduce {fmt.Println("Producer ", index, " stop produce, sleep 5 seconds")lock.Unlock()time.Sleep(time.Second * 5)continue}fmt.Println("Producer ", index, " begin produce")if productcount >= PRODUCT_MAX {fmt.Println("Products are full")stopProduce = truelock.Unlock()//产品满了,生产者wait<-produce_waitlock.Lock()stopProduce = falselock.Unlock()continue}productcount++fmt.Println("Products count is ", productcount)if stopConsume {var consumActive struct{}consume_wait <- consumActive}lock.Unlock()}
}

我们在22行设置了stopProduce为true,然后在25行挂起了该协程,其他生产者协程发现stopProduce为true,则睡眠5秒。

此办法保证了资源数临界值后仅有单个协程挂起,不会影响到其他同类协程。
同样实现消费者,这里不做赘述。
考虑这样一个场景,如果在生产者设置bool解锁后,其他消费者抢占锁后为了激活生产者,优先写入信道produce_wait,
此时生产者还没有从produce_wait读取,也不会有问题,毕竟生产者迟早要读取。
接下来我们测试下

可以看到当生产者1生产数上限后,其他生产者会进入休眠。当消费者激活后,生产者继续生产,其他生产者休眠后同样可以生产。
提高了并发效率。

源码下载

完整版源码地址
https://github.com/secondtonone1/golang-/tree/master/producerconsumer

感谢关注公众号

golang实现生产者消费者模型相关推荐

  1. 用三个线程实现生产者消费者模型,其中一个线程作为生产者,二个线程作为消费者,生产者随机生产一个时间戳或者字符串,消费者消费这个时间戳,并不能重复消费,并将其打印出来

    题目要求: 用三个线程实现生产者消费者模型,其中一个线程作为生产者,二个线程作为消费者,生产者随机生产一个时间戳或者字符串,消费者消费这个时间戳,并不能重复消费,并将其打印出来.(这是一道百度面试的算 ...

  2. 生产者/消费者模型详解(基于Java)

    title: 生产者消费者模型 tags: 多线程 synchronized 锁 wait() notify() 生产者/消费者模型原理以及代码实现 一.生产者/消费者模型原理 所谓的生产者消费者模型 ...

  3. python 生产消费者_python之生产者消费者模型实现详解

    代码及注释如下 #Auther Bob #--*--conding:utf-8 --*-- #生产者消费者模型,这里的例子是这样的,有一个厨师在做包子,有一个顾客在吃包子,有一个服务员在储存包子,这个 ...

  4. java多线程抽奖_java 线程池、多线程并发实战(生产者消费者模型 1 vs 10) 附案例源码...

    导读 前二天写了一篇<Java 多线程并发编程>点我直达,放国庆,在家闲着没事,继续写剩下的东西,开干! 线程池 为什么要使用线程池 例如web服务器.数据库服务器.文件服务器或邮件服务器 ...

  5. 进程 互斥锁、队列与管道、生产者消费者模型

    目录 1.互斥锁 2.队列与管道 3.生产者消费者模型(Queue) 4.生产者消费者模型(JoinableQueue) 1.互斥锁 首先导入Lock模块 实例化一把锁 但是每次生成子进程的时候都会重 ...

  6. 11.python并发入门(part8 基于线程队列实现生产者消费者模型)

    一.什么是生产者消费者模型? 生产者就是生产数据的线程,消费者指的就是消费数据的线程. 在多线程开发过程中,生产者的速度比消费者的速度快,那么生产者就必须等待消费者把数据处理完,生产者才会产生新的数据 ...

  7. 线程同步之经典生产者-消费者模型

    /* 线程同步之生产者-消费者模型 该模型符合以下要求: 1.生产者只在仓储未满时生产,仓满则停止生产: 2.消费者只在仓储未空时消费,仓空则等待: 3.当消费者发现仓储为空时则通知生产者生产: 4. ...

  8. Python之路(第三十八篇) 并发编程:进程同步锁/互斥锁、信号量、事件、队列、生产者消费者模型...

    一.进程锁(同步锁/互斥锁) 进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的, 而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理. 例 ...

  9. C++编程模拟生产者消费者模型

    生产者消费者问题是操作系统中典型的进程同步互斥问题,(英语:Producer-Consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同 ...

最新文章

  1. android+布局分块,android的List View的Item布局问题
  2. 如何给指定地址空间拍一个快照
  3. 获取Json中特定的值
  4. mybatis做批量删除时写SQL语句时遇到的问题
  5. C# 7.0 新特性:本地方法
  6. pytorchyolov4训练_使用pytorch-yolov5 訓練自己的數據集-2020.6.15
  7. 存储计算解耦合,构建中国人英语语音数据库
  8. CentOS 6.2 Eclipse CDT 开发环境搭建
  9. MySQL 慌了!这个分库分表方法论,要火了?
  10. 软件工程中需要学习和掌握的软件都有哪些_9个B端产品经理需要懂的技术
  11. 安装包被误删了可以用EasyRecovery恢复吗
  12. 通俗理解激活函数作用和常见激活函数总结:sigmoid、tanh、relu、Leaky-relu、P-relu、R-Relu、elu
  13. Luogu1638 逛画展
  14. 2022年计算机软件水平考试嵌入式系统设计师(中级)练习题及答案
  15. win7忘记开机密码怎么办?
  16. 这两款好用的识别图片文字的软件app值得你们收藏
  17. c++ memcpy内存拷贝
  18. Vue 知识点汇总(下)--附案例代码及项目地址
  19. html倒计时还有多少天,2020年只剩70天 2021年倒计时还有多少天?
  20. 在1000万整数中找到前100个最大的数 算法

热门文章

  1. PHP球球大作战代点源码 开源无加密+简单安装说明
  2. MOS管进阶部分,那些你不了解的MOS管知识
  3. java邮箱发送_java邮箱发送报错
  4. RecyclerView+FloatingActionButton应用
  5. 常见的几种移动app开发模式
  6. 夫妻吵架也要有技巧!
  7. pageable设置size_spring – 设置JPA Pageable Object的默认页面大小
  8. 还在复制粘贴做Word?用PDF转Word吧!
  9. 剪辑小技巧,多个横屏视频如何批量转换成竖屏播放
  10. 2021SHD超级街舞梦想营--三明站