C++实现生产者和消费者模型

  • C++实现生产者和消费者模型
    • 1、实现细节
    • 2、单生产者-单消费者模型
    • 3、单生产者-多消费者模型
    • 4、多生产者-单消费者模型
    • 5、多生产者-多消费者模型
  • 参考

C++实现生产者和消费者模型

1、实现细节

  • 具体的实现逻辑是构建一个queue来存储生产的数据,queue不满时可以生产,不空时可以消费。
  • 对于这个队列,采用阻塞队列的实现思路。
  • 先实现构造函数,初始化一个unique_lock供condition_variable使用。
    • 如何在类里面使用unique_lock等需要初始化,并且初始化会加锁的对象。这要研究下。我的理解是构造列表初始化,然后函数体里unlock。
    • 对于条件变量,申请两个,分别控制consumer和producer。
  • 然后就是入和出队列的细节。
    • 首先加锁。
    • 循环判断一下目前的队列情况,对于各自的特殊情况(队满和队空)进行处理。
    • 唤醒一个线程来处理特殊情况。
    • 等待处理完毕。
    • 处理入和出队列操作。
    • 最后释放锁。

2、单生产者-单消费者模型

  • 单生产者-单消费者模型中只有一个生产者和一个消费者,
  • 生产者不停地往产品库中放入产品,
  • 消费者则从产品库中取走产品,
  • 产品库容积有限制,只能容纳一定数目的产品,
  • 如果生产者生产产品的速度过快,则需要等待消费者取走产品之后,产品库不为空才能继续往产品库中放置新的产品,
  • 相反,如果消费者取走产品的速度过快,则可能面临产品库中没有产品可使用的情况,此时需要等待生产者放入一个产品后,消费者才能继续工作。

C++11实现单生产者单消费者模型的代码如下:

#include <unistd.h>#include <cstdlib>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>static const int bufSize = 10; // Item buffer size.
static const int ProNum = 20;   // How many items we plan to produce.struct resource {int buf[bufSize]; // 产品缓冲区, 配合 read_pos 和 write_pos 模型环形队列.size_t read_pos; // 消费者读取产品位置.size_t write_pos; // 生产者写入产品位置.std::mutex mtx; // 互斥量,保护产品缓冲区std::condition_variable not_full; // 条件变量, 指示产品缓冲区不为满.std::condition_variable not_empty; // 条件变量, 指示产品缓冲区不为空.
} instance; // 产品库全局变量, 生产者和消费者操作该变量.typedef struct resource resource;void Producer(resource *ir, int item)
{std::unique_lock<std::mutex> lock(ir->mtx);while (((ir->write_pos + 1) % bufSize)== ir->read_pos) { // item buffer is full, just wait here.std::cout << "Producer is waiting for an empty slot...\n";(ir->not_full).wait(lock); // 生产者等待"产品库缓冲区不为满"这一条件发生.}(ir->buf)[ir->write_pos] = item; // 写入产品.(ir->write_pos)++; // 写入位置后移.if (ir->write_pos == bufSize) // 写入位置若是在队列最后则重新设置为初始位置.ir->write_pos = 0;(ir->not_empty).notify_all(); // 通知消费者产品库不为空.
}int Consumer(resource *ir)
{int data;std::unique_lock<std::mutex> lock(ir->mtx);// item buffer is empty, just wait here.while (ir->write_pos == ir->read_pos) {std::cout << "Consumer is waiting for items...\n";(ir->not_empty).wait(lock); // 消费者等待"产品库缓冲区不为空"这一条件发生.}data = (ir->buf)[ir->read_pos]; // 读取某一产品(ir->read_pos)++; // 读取位置后移if (ir->read_pos >= bufSize) // 读取位置若移到最后,则重新置位.ir->read_pos = 0;(ir->not_full).notify_all(); // 通知消费者产品库不为满.return data; // 返回产品.
}void ProducerTask() // 生产者任务
{for (int i = 1; i <= ProNum; ++i) {// sleep(1);std::cout << "Produce the " << i << "^th item..." << std::endl;Producer(&instance, i); // 循环生产 ProNum 个产品.}
}void ConsumerTask() // 消费者任务
{static int cnt = 0;while (1) {sleep(1);int item = Consumer(&instance); // 消费一个产品.std::cout << "Consume the " << item << "^th item" << std::endl;if (++cnt == ProNum) break; // 如果产品消费个数为 ProNum, 则退出.}
}void Initresource(resource *ir)
{ir->write_pos = 0; // 初始化产品写入位置.ir->read_pos = 0; // 初始化产品读取位置.
}int main()
{Initresource(&instance);std::thread producer(ProducerTask); // 创建生产者线程.std::thread consumer(ConsumerTask); // 创建消费之线程.producer.join();consumer.join();
}

3、单生产者-多消费者模型

与单生产者和单消费者模型不同的是,单生产者-多消费者模型中可以允许多个消费者同时从产品库中取走产品。所以除了保护产品库在多个读写线程下互斥之外,还需要维护消费者取走产品的计数器,代码如下:

#include <unistd.h>#include <cstdlib>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>static const int bufSize = 8; // Item buffer size.
static const int ProNum = 30;   // How many items we plan to produce.struct resource {int buf[bufSize]; // 产品缓冲区, 配合 read_pos 和 write_pos 模型环形队列.size_t read_pos; // 消费者读取产品位置.size_t write_pos; // 生产者写入产品位置.size_t item_counter;std::mutex mtx; // 互斥量,保护产品缓冲区std::mutex item_counter_mtx;std::condition_variable not_full; // 条件变量, 指示产品缓冲区不为满.std::condition_variable not_empty; // 条件变量, 指示产品缓冲区不为空.
} instance; // 产品库全局变量, 生产者和消费者操作该变量.typedef struct resource resource;void Producer(resource *ir, int item)
{std::unique_lock<std::mutex> lock(ir->mtx);while (((ir->write_pos + 1) % bufSize)== ir->read_pos) { // item buffer is full, just wait here.std::cout << "Producer is waiting for an empty slot...\n";(ir->not_full).wait(lock); // 生产者等待"产品库缓冲区不为满"这一条件发生.}(ir->buf)[ir->write_pos] = item; // 写入产品.(ir->write_pos)++; // 写入位置后移.if (ir->write_pos == bufSize) // 写入位置若是在队列最后则重新设置为初始位置.ir->write_pos = 0;(ir->not_empty).notify_all(); // 通知消费者产品库不为空.lock.unlock(); // 解锁.
}int Consumer(resource *ir)
{int data;std::unique_lock<std::mutex> lock(ir->mtx);// item buffer is empty, just wait here.while (ir->write_pos == ir->read_pos) {std::cout << "Consumer is waiting for items...\n";(ir->not_empty).wait(lock); // 消费者等待"产品库缓冲区不为空"这一条件发生.}data = (ir->buf)[ir->read_pos]; // 读取某一产品(ir->read_pos)++; // 读取位置后移if (ir->read_pos >= bufSize) // 读取位置若移到最后,则重新置位.ir->read_pos = 0;(ir->not_full).notify_all(); // 通知消费者产品库不为满.lock.unlock(); // 解锁.return data; // 返回产品.
}void ProducerTask() // 生产者任务
{for (int i = 1; i <= ProNum; ++i) {// sleep(1);std::cout << "Producer thread " << std::this_thread::get_id()<< " producing the " << i << "^th item..." << std::endl;Producer(&instance, i); // 循环生产 ProNum 个产品.}std::cout << "Producer thread " << std::this_thread::get_id()<< " is exiting..." << std::endl;
}void ConsumerTask() // 消费者任务
{bool ready_to_exit = false;while (1) {sleep(1);std::unique_lock<std::mutex> lock(instance.item_counter_mtx);if (instance.item_counter < ProNum) {int item = Consumer(&instance);++(instance.item_counter);std::cout << "Consumer thread " << std::this_thread::get_id()<< " is consuming the " << item << "^th item" << std::endl;}elseready_to_exit = true;if (ready_to_exit == true)break;}std::cout << "Consumer thread " << std::this_thread::get_id()<< " is exiting..." << std::endl;
}void Initresource(resource *ir)
{ir->write_pos = 0; // 初始化产品写入位置.ir->read_pos = 0; // 初始化产品读取位置.ir->item_counter = 0;
}int main()
{Initresource(&instance);std::thread producer(ProducerTask);std::thread consumer1(ConsumerTask);std::thread consumer2(ConsumerTask);std::thread consumer3(ConsumerTask);std::thread consumer4(ConsumerTask);producer.join();consumer1.join();consumer2.join();consumer3.join();consumer4.join();
}

4、多生产者-单消费者模型

与单生产者和单消费者模型不同的是,多生产者-单消费者模型中可以允许多个生产者同时向产品库中放入产品。所以除了保护产品库在多个读写线程下互斥之外,还需要维护生产者放入产品的计数器,代码如下:

#include <unistd.h>#include <cstdlib>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>static const int bufSize = 8; // Item buffer size.
static const int ProNum = 20;   // How many items we plan to produce.struct resource {int buf[bufSize]; // 产品缓冲区, 配合 read_pos 和 write_pos 模型环形队列.size_t read_pos; // 消费者读取产品位置.size_t write_pos; // 生产者写入产品位置.size_t item_counter;std::mutex mtx; // 互斥量,保护产品缓冲区std::mutex item_counter_mtx;std::condition_variable not_full; // 条件变量, 指示产品缓冲区不为满.std::condition_variable not_empty; // 条件变量, 指示产品缓冲区不为空.
} instance; // 产品库全局变量, 生产者和消费者操作该变量.typedef struct resource resource;void Producer(resource *ir, int item)
{std::unique_lock<std::mutex> lock(ir->mtx);while (((ir->write_pos + 1) % bufSize)== ir->read_pos) { // item buffer is full, just wait here.std::cout << "Producer is waiting for an empty slot...\n";(ir->not_full).wait(lock); // 生产者等待"产品库缓冲区不为满"这一条件发生.}(ir->buf)[ir->write_pos] = item; // 写入产品.(ir->write_pos)++; // 写入位置后移.if (ir->write_pos == bufSize) // 写入位置若是在队列最后则重新设置为初始位置.ir->write_pos = 0;(ir->not_empty).notify_all(); // 通知消费者产品库不为空.
}int Consumer(resource *ir)
{int data;std::unique_lock<std::mutex> lock(ir->mtx);// item buffer is empty, just wait here.while (ir->write_pos == ir->read_pos) {std::cout << "Consumer is waiting for items...\n";(ir->not_empty).wait(lock); // 消费者等待"产品库缓冲区不为空"这一条件发生.}data = (ir->buf)[ir->read_pos]; // 读取某一产品(ir->read_pos)++; // 读取位置后移if (ir->read_pos >= bufSize) // 读取位置若移到最后,则重新置位.ir->read_pos = 0;(ir->not_full).notify_all(); // 通知消费者产品库不为满.return data; // 返回产品.
}void ProducerTask() // 生产者任务
{bool ready_to_exit = false;while (1) {sleep(1);std::unique_lock<std::mutex> lock(instance.item_counter_mtx);if (instance.item_counter < ProNum) {++(instance.item_counter);Producer(&instance, instance.item_counter);std::cout << "Producer thread " << std::this_thread::get_id()<< " is producing the " << instance.item_counter<< "^th item" << std::endl;}else ready_to_exit = true;if (ready_to_exit == true) break;}std::cout << "Producer thread " << std::this_thread::get_id()<< " is exiting..." << std::endl;
}void ConsumerTask() // 消费者任务
{static int cnt = 0;while (1) {sleep(1);cnt++;if (cnt <= ProNum){int item = Consumer(&instance); // 消费一个产品.std::cout << "Consumer thread " << std::this_thread::get_id()<< " is consuming the " << item << "^th item" << std::endl;}elsebreak; // 如果产品消费个数为 ProNum, 则退出.}std::cout << "Consumer thread " << std::this_thread::get_id()<< " is exiting..." << std::endl;
}void Initresource(resource *ir)
{ir->write_pos = 0; // 初始化产品写入位置.ir->read_pos = 0; // 初始化产品读取位置.ir->item_counter = 0;
}int main()
{Initresource(&instance);std::thread producer1(ProducerTask);std::thread producer2(ProducerTask);std::thread producer3(ProducerTask);std::thread producer4(ProducerTask);std::thread consumer(ConsumerTask);producer1.join();producer2.join();producer3.join();producer4.join();consumer.join();
}

5、多生产者-多消费者模型

该模型可以说是前面两种模型的综合,程序需要维护两个计数器,分别是生产者已生产产品的数目和消费者已取走产品的数目。另外也需要保护产品库在多个生产者和多个消费者互斥地访问。

#include <unistd.h>#include <cstdlib>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>static const int bufSize = 8; // Item buffer size.
static const int ProNum = 20;   // How many items we plan to produce.struct resource {int buf[bufSize]; // 产品缓冲区, 配合 read_pos 和 write_pos 模型环形队列.size_t read_pos; // 消费者读取产品位置.size_t write_pos; // 生产者写入产品位置.size_t pro_item_counter;size_t con_item_counter;std::mutex mtx; // 互斥量,保护产品缓冲区std::mutex pro_mtx;std::mutex con_mtx;std::condition_variable not_full; // 条件变量, 指示产品缓冲区不为满.std::condition_variable not_empty; // 条件变量, 指示产品缓冲区不为空.
} instance; // 产品库全局变量, 生产者和消费者操作该变量.typedef struct resource resource;void Producer(resource *ir, int item)
{std::unique_lock<std::mutex> lock(ir->mtx);while (((ir->write_pos + 1) % bufSize)== ir->read_pos) { // item buffer is full, just wait here.std::cout << "Producer is waiting for an empty slot...\n";(ir->not_full).wait(lock); // 生产者等待"产品库缓冲区不为满"这一条件发生.}(ir->buf)[ir->write_pos] = item; // 写入产品.(ir->write_pos)++; // 写入位置后移.if (ir->write_pos == bufSize) // 写入位置若是在队列最后则重新设置为初始位置.ir->write_pos = 0;(ir->not_empty).notify_all(); // 通知消费者产品库不为空.
}int Consumer(resource *ir)
{int data;std::unique_lock<std::mutex> lock(ir->mtx);// item buffer is empty, just wait here.while (ir->write_pos == ir->read_pos) {std::cout << "Consumer is waiting for items...\n";(ir->not_empty).wait(lock); // 消费者等待"产品库缓冲区不为空"这一条件发生.}data = (ir->buf)[ir->read_pos]; // 读取某一产品(ir->read_pos)++; // 读取位置后移if (ir->read_pos >= bufSize) // 读取位置若移到最后,则重新置位.ir->read_pos = 0;(ir->not_full).notify_all(); // 通知消费者产品库不为满.return data; // 返回产品.
}void ProducerTask() // 生产者任务
{bool ready_to_exit = false;while (1) {sleep(1);std::unique_lock<std::mutex> lock(instance.pro_mtx);if (instance.pro_item_counter < ProNum) {++(instance.pro_item_counter);Producer(&instance, instance.pro_item_counter);std::cout << "Producer thread " << std::this_thread::get_id()<< " is producing the " << instance.pro_item_counter<< "^th item" << std::endl;}else ready_to_exit = true;lock.unlock();if (ready_to_exit == true) break;}std::cout << "Producer thread " << std::this_thread::get_id()<< " is exiting..." << std::endl;
}void ConsumerTask() // 消费者任务
{bool ready_to_exit = false;while (1) {sleep(1);std::unique_lock<std::mutex> lock(instance.con_mtx);if (instance.con_item_counter < ProNum) {int item = Consumer(&instance);++(instance.con_item_counter);std::cout << "Consumer thread " << std::this_thread::get_id()<< " is consuming the " << item << "^th item" << std::endl;}elseready_to_exit = true;lock.unlock();if (ready_to_exit == true)break;}std::cout << "Consumer thread " << std::this_thread::get_id()<< " is exiting..." << std::endl;
}void Initresource(resource *ir)
{ir->write_pos = 0; // 初始化产品写入位置.ir->read_pos = 0; // 初始化产品读取位置.ir->pro_item_counter = 0;ir->con_item_counter = 0;
}int main()
{Initresource(&instance);std::thread producer1(ProducerTask);std::thread producer2(ProducerTask);std::thread producer3(ProducerTask);std::thread producer4(ProducerTask);std::thread consumer1(ConsumerTask);std::thread consumer2(ConsumerTask);std::thread consumer3(ConsumerTask);std::thread consumer4(ConsumerTask);producer1.join();producer2.join();producer3.join();producer4.join();consumer1.join();consumer2.join();consumer3.join();consumer4.join();return 0;
}

参考

1、https://www.cnblogs.com/haippy/p/3252092.html
2、https://blog.csdn.net/qq_41681241/article/details/86708303
3、https://blog.csdn.net/h_wulingfei/article/details/104897449

C++实现生产者和消费者模型相关推荐

  1. linux进程间通信:system V 信号量 生产者和消费者模型编程案例

    生产者和消费者模型: 有若干个缓冲区,生产者不断向里填数据,消费者不断从中取数据 两者不冲突的前提: 缓冲区有若干个,且是固定大小,生产者和消费者各有若干个 生产者向缓冲区中填数据前需要判断缓冲区是否 ...

  2. python生产和消费模型_python queue和生产者和消费者模型

    queue队列 当必须安全地在多个线程之间交换信息时,队列在线程编程中特别有用. classqueue.Queue(maxsize=0) #先入先出classqueue.LifoQueue(maxsi ...

  3. Linux系统编程---17(条件变量及其函数,生产者消费者条件变量模型,生产者与消费者模型(线程安全队列),条件变量优点,信号量及其主要函数,信号量与条件变量的区别,)

    条件变量 条件变量本身不是锁!但它也可以造成线程阻塞.通常与互斥锁配合使用.给多线程提供一个会合的场所. 主要应用函数: pthread_cond_init 函数 pthread_cond_destr ...

  4. 计算机操作系统生产者和消费者模型的简单介绍

    同步互斥小口诀 画图理解题目 判断题目类型 分析进程数目 填写进程模板 补充基本代码(伪代码) 补充PV代码 检查调整代码 注意事项 代码是一步一步写出来的,代码是反复调整写出来的 60%是生产者和消 ...

  5. 并发无锁队列学习(单生产者单消费者模型)

    1.引言 本文介绍单生产者单消费者模型的队列.依据写入队列的内容是定长还是变长,分为单生产者单消费者定长队列和单生产者单消费者变长队列两种. 单生产者单消费者模型的队列操作过程是不须要进行加锁的.生产 ...

  6. Linux系统编程40:多线程之基于环形队列的生产者与消费者模型

    文章目录 (1)什么是信号量 (2)与信号量相关的操作 (3)基于环形队列的生产者与消费者模型-信号量(单消费者单生产者) (1)什么是信号量 前面的叙述中,我们通过锁保证了每次只有一个线程进入临界区 ...

  7. Linux系统编程39:多线程之基于阻塞队列生产者与消费者模型

    文章目录 (1)生产者与消费者模型概述 (2)生产者与消费者模型优点 (3)基于阻塞队列(blockingqueue)的生产者消费者模型(单消费者单生产者) (4)基于阻塞队列(blockingque ...

  8. Go语言编程:使用条件变量Cond和channel通道实现多个生产者和消费者模型

    如题,使用条件变量Cond和channel通道实现多个生产者和消费者模型.Go语言天生带有C语言的基因,很多东西和C与很像,但是用起来 绝对比C语言方便.今天用Go语言来实现下多消费者和生产者模型.如 ...

  9. Linux系统编程:使用semaphore信号量和mutex互斥量实现多个生产者和消费者模型

    代码实现 如题,使用semaphore信号量和mutex互斥量实现多个生产者和消费者模型.本来是想只用信号量实现生产者消费者模型的,但是发现 只能在一个生产者和一个消费者之间,要在多个生产者和消费者模 ...

  10. Linux 生产者与消费者模型C++实现

    生产者与消费者模型 本篇博客代码实现都是在linux环境下跑的 通过条件变量实现 应用场景:针对大量数据的产生与处理的场景 生产与处理是放到不同执行流中完成的,中间会增加一个数据缓冲区,作为中间的数据 ...

最新文章

  1. (0087)iOS开发之NSString属性为什么要用copy来修饰?
  2. python lambda表达式的使用方法(匿名函数)
  3. BZOJ2597 WC2007剪刀石头布(费用流)
  4. 对话框找不到WM_ERASEBKGND消息的解决方法与对话框背景图片的载入方法
  5. 【开源】本周不容错过开源论文,含分割、检索、神经渲染、deepfake 检测、超分、视频相关等...
  6. 常用正则表达式锦集与Python中正则表达式的用法
  7. 牛客网刷题知识汇总2
  8. 住个酒店,“我”的隐私就被强制扒干净了!
  9. 解决Python查询Mysql中文乱码问题
  10. 《Deep Snake for Real-Time Instance Segmentation》
  11. bootstrap-select 插件使用详解
  12. 个人发卡系统 - 个人发卡网_忆当站长经历,第三方发卡平台跑路,小淘自建即时到账支付系统...
  13. pthread 立即停止线程_pthread_create线程终止问题
  14. 非线性拟合matlab代码,Matlab非线性拟合
  15. 【听课笔记】复旦大学遗传学_05染色体畸变
  16. 基于PHP课程网站设计开题报告,在线课程网站设计开题报告
  17. java六个必须理解的问题+java学习方法
  18. 细说华为和荣耀的关系:潮流的荣耀和稳重的华为
  19. 图片怎么转jpg?教你两个超简单的图片转jpg格式的方法
  20. 视频硬字幕提取方法(可完全离线),开发个小工具辅助一下

热门文章

  1. 计算机键盘上切换账户怎么办,笔记本切换小键盘,详细教您笔记本小键盘怎么切换...
  2. HTML中怎样把文字分两栏显示,word设置一页分两栏的三种方法
  3. xtrabackup安装、进行全量备份增量备份
  4. P2P网络ISIS的PSNP报文的两种用途
  5. axure后台管理系统原型rp
  6. php怎样获取当前时间,php中获取当前时间的函数
  7. HDU 4937Lucky Number
  8. DevC++ 报错[Error] Id returned 1 exit status
  9. 程序员真的是吃青春饭吗?分享我的6点面试经验,分分钟搞定!
  10. myql 查询树形表结果:说说、说说的评论、评论的回复