Linux 生产者与消费者模型C++实现
生产者与消费者模型
本篇博客代码实现都是在linux环境下跑的
通过条件变量实现
应用场景:针对大量数据的产生与处理的场景
生产与处理是放到不同执行流中完成的,中间会增加一个数据缓冲区,作为中间的数据缓冲场所,例如如果生成速度比处理数据速度快,那就可以将生成了的数据放到该缓冲区中,消费者只管从缓冲区取数据就行,这样子就可以提高它们的工作效率
生产者与消费者模型的优势:
1、降低生成者与消费者之间的耦合度
2、支持忙闲不均,生成者生成速度快,不一样要等到有空闲的消费者,只要将数据放到缓冲区即可,等待消费者去处理
3、支持并发,多个生产者线程和多个消费者同时刻执行自己的任务
生产者与消费者模型的实现
实现该模型的重点难点其实是在实现一个线程安全的缓冲队列
我们模拟一个缓冲队列最大只能放下5个数据,当数据满时,生产者等待,并唤醒消费者,当没有数据时,消费者等待,并唤醒生产者。
队列我们可以使用STL库中的queue容器,但是该容器是非线程安全的,这时候我们就必须自己添加有关信息变量来控制队列,定义一个容量上限capacity
,防止内存耗尽导致程序崩溃;定义一个互斥变量mutex
,保证资源的安全性;定义两个条件变量productor_cond
和customer_cond
,保证访问资源的合理性。在保证线程安全的队列的前提下,我们还得向外提供出队与入队的操作push()
生成和pop()
消费。
代码实现
#include <iostream>
#include <stdio.h>
#include <queue>
#include <pthread.h>
using namespace std;//线程安全的缓冲队列
#define QUEUE_MAX 5
class BlockQueue
{public:BlockQueue(int maxq = QUEUE_MAX):_capacity(maxq){pthread_mutex_init(&_mutex, NULL);pthread_cond_init(&_pro_cond, NULL);pthread_cond_init(&_cus_cond, NULL);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_pro_cond);pthread_cond_destroy(&_cus_cond);}bool push(const int& data){//生产者将数据入队,若数据满了需要阻塞pthread_mutex_lock(&_mutex);while (_queue.size() == _capacity){pthread_cond_wait(&_pro_cond, &_mutex);}//将数据入队_queue.push(data);//解锁pthread_mutex_unlock(&_mutex);//唤醒消费者线程pthread_cond_signal(&_cus_cond);return true;}bool pop(int *data){//消费者将数据出队,若没有数据需要阻塞pthread_mutex_lock(&_mutex);while (_queue.empty()){pthread_cond_wait(&_cus_cond, &_mutex);}//获取队头元素*data = _queue.front();//将数据出队_queue.pop();//解锁pthread_mutex_unlock(&_mutex);//唤醒生产者线程pthread_cond_signal(&_pro_cond);return true;}
private:queue<int> _queue;//简单队列int _capacity;//最大节点数量pthread_mutex_t _mutex;//互斥变量pthread_cond_t _pro_cond;//生产者条件变量pthread_cond_t _cus_cond;//消费者条件变量
};void *thr_productor(void* arg )
{BlockQueue *queue = (BlockQueue*)arg;int i = 0;while (1){//生产者生成数据queue->push(i);printf("productor push data:%d\n", i++);}return NULL;
}
void *thr_customer(void* arg )
{BlockQueue *queue = (BlockQueue*)arg;while (1){//消费者不断获取数据int data;queue->pop(&data);printf("customer pop data:%d\n", data);}return NULL;
}int main()
{int ret, i;pthread_t ptid[4], ctid[4];BlockQueue queue;for (i = 0; i < 4; ++i){ret = pthread_create(&ptid[i], NULL, thr_productor, (void*)&queue);if (ret != 0){printf("create productor thread error\n");return -1;}ret = pthread_create(&ctid[i], NULL, thr_customer, (void*)&queue);if (ret != 0){printf("create productor thread error\n");return -1;}}for (i = 0; i < 4; i++){pthread_join(ptid[i], NULL);pthread_join(ctid[i], NULL);}return 0;
}
运行结果:
通过信号量实现
信号量可以用于实现线程或者进程同步与互斥(主要用于同步)
信号量 = 一个计数器 + pcb等待队列
同步原理:通过自身计数器对资源进行计数,并通过计数器的资源计数,判断进程/线程是否能够符合访问资源的条件,若符合就可以访问,若不符合则调用提供的接口使进程/线程阻塞;其他进程/线程促使条件满足之后,可以唤醒pcb等待队列上的进程/线程
互斥原理:保证计数器的计数不大于1,就保证了资源只有一个,并且同一时间只能被一个进程/线程访问
操作流程
1、定义信号量 sem_t sem
2、初始化信号量int sem_init(sem_t *sem, int pshared, unsigned int value)
参数内容(sem
:我们定义的信号量;pshared
:标识该信号量用于进程还是线程,0表示用于线程间,非0表示用于进程间;value
:初始化信号量,初识资源数量有多少该值就为多少) 返回值:成功返回0,失败返回-1
3、在访问临界资源之前,先访问信号量,判断是否能够访问,计数-1。接口1int sem_wait(sem_t *sem)
通过自身计数判断是否满足访问条件,不满足就一直阻塞;接口2int sem_trywait(sem_t *sem)
通过自身计数判断是否满足访问条件,不满足就报错返回;接口3int sem_timewait(sem_t *sem, const struct timespec *abs_timeout)
通过自身计数判断是否满足访问条件,当不满足就等待指定的时间,超时就报错返回
4、促使访问条件满足,计数+1,唤醒阻塞线程/进程int sem_post(sem_t *sem)
5、销毁信号量 int sem_destroy(sem_t *sem)
通过信号量实现一个生产者与消费者模型
使用vector实现队列
定义一个vector
数组,实现一个等待队列,定义_capacity
,用于指定队列的最大容量,再定义两个指针_step_read
和_step_write
,用于记录读与写的操作位置。定义三个信号量,_lock
用于实现互斥;_sem_idle
用于对空闲空间进行计数,对于生产者来有说空闲空间的时候才能写入数据,计数>0,初始化为最大容量;_sem_data
用于对具有数据的空间进行计数,对于消费者来说有数据的空间才能取数据,计数>0,初始化为0。
代码实现:
1 #include <cstdio>2 #include <iostream>3 #include <vector>4 #include <pthread.h>5 #include <semaphore.h>6 using namespace std;7 8 #define QUEUE_MAX 59 class RingQueue10 {11 public:12 RingQueue(int maxq = QUEUE_MAX)13 :_queue(maxq)14 ,_capacity(maxq)15 ,_step_read(0)16 ,_step_write(0)17 {18 sem_init(&_lock, 0, 1);//实现互斥锁19 sem_init(&_sem_data, 0, 0);//数据空间初始化为020 sem_init(&_sem_idle, 0, maxq);//空闲空间初始化为maxq21 }22 ~RingQueue()23 {24 sem_destroy(&_lock);25 sem_destroy(&_sem_data);26 sem_destroy(&_sem_idle);27 }28 bool push(const int& data)29 {//先判断能否访问在加锁这个顺序不能乱,如果相反了,加锁成功后发现没有空闲结点,就会一直阻塞30 //1、判断是否能访问,不能就阻塞--空闲空间计数的判断,能访问空闲空间计数-131 sem_wait(&_sem_idle);32 //2、能访问就加锁,保护访问过程33 sem_wait(&_lock);//计数不能大于1,-1为可访问34 //3、资源的访问35 _queue[_step_write] = data;36 _step_write = (_step_write + 1) % _capacity;37 //4、解锁38 sem_post(&_lock);//lock计数+1,唤醒因加锁而阻塞的线程39 //5、入队数据之后,数据空间计数+1,唤醒消费者40 sem_post(&_sem_data);41 return true;42 }43 bool pop(int *data)44 {45 sem_wait(&_sem_data);//判断数据空间是否能访问46 sem_wait(&_lock);//有数据则加锁保护访问数据的过程47 *data = _queue[_step_read];48 _step_read = (_step_read + 1) % _capacity;49 sem_post(&_lock);//解锁50 sem_post(&_sem_idle);//唤醒生产者51 return true;52 }53 private:54 vector<int> _queue;55 int _capacity;56 int _step_read;57 int _step_write;58 59 sem_t _lock;60 sem_t _sem_data;61 sem_t _sem_idle;62 };63 void *thr_productor(void *arg)64 {65 RingQueue *queue = (RingQueue*)arg;66 int i = 0;67 while (1)68 {69 queue->push(i);70 printf("productor push data:%d\n", i++);71 }72 return NULL;73 }74 75 76 void *thr_customer(void *arg)77 {78 RingQueue *queue = (RingQueue*)arg;79 while (1)80 {81 int data;82 queue->pop(&data);83 printf("customer pop data:%d\n", data);84 }85 return NULL;86 }87 88 int main()89 {90 int ret, i;91 pthread_t ptid[4], ctid[4];92 RingQueue queue;93 94 for (i = 0; i < 4; ++i)95 {96 ret = pthread_create(&ptid[i], NULL, thr_productor, (void*)&queue);97 if (ret != 0)98 {99 printf("create productor thread error\n");
100 return -1;
101 }
102 ret = pthread_create(&ctid[i], NULL, thr_customer, (void*)&queue);
103 if (ret != 0)
104 {105 printf("create productor thread error\n");
106 return -1;
107 }
108 }
109 for (i = 0; i < 4; ++i)
110 {111 pthread_join(ptid[i], NULL);
112 pthread_join(ctid[i], NULL);
113 }
114 return 0;
115 }
运行结果:
Linux 生产者与消费者模型C++实现相关推荐
- linux进程间通信:system V 信号量 生产者和消费者模型编程案例
生产者和消费者模型: 有若干个缓冲区,生产者不断向里填数据,消费者不断从中取数据 两者不冲突的前提: 缓冲区有若干个,且是固定大小,生产者和消费者各有若干个 生产者向缓冲区中填数据前需要判断缓冲区是否 ...
- Linux系统编程:使用semaphore信号量和mutex互斥量实现多个生产者和消费者模型
代码实现 如题,使用semaphore信号量和mutex互斥量实现多个生产者和消费者模型.本来是想只用信号量实现生产者消费者模型的,但是发现 只能在一个生产者和一个消费者之间,要在多个生产者和消费者模 ...
- 【Linux下】 线程同步 生产者与消费者模型
文章目录 [Linux下] 线程同步 生产者与消费者模型 线程同步 同步概念与竞态条件 条件变量 条件变量本质 操作条件变量 初始化和销毁条件变量 等待 唤醒 通过条件变量实现的简单线程同步例子 为什 ...
- Linux系统编程---17(条件变量及其函数,生产者消费者条件变量模型,生产者与消费者模型(线程安全队列),条件变量优点,信号量及其主要函数,信号量与条件变量的区别,)
条件变量 条件变量本身不是锁!但它也可以造成线程阻塞.通常与互斥锁配合使用.给多线程提供一个会合的场所. 主要应用函数: pthread_cond_init 函数 pthread_cond_destr ...
- Linux系统编程40:多线程之基于环形队列的生产者与消费者模型
文章目录 (1)什么是信号量 (2)与信号量相关的操作 (3)基于环形队列的生产者与消费者模型-信号量(单消费者单生产者) (1)什么是信号量 前面的叙述中,我们通过锁保证了每次只有一个线程进入临界区 ...
- Linux系统编程39:多线程之基于阻塞队列生产者与消费者模型
文章目录 (1)生产者与消费者模型概述 (2)生产者与消费者模型优点 (3)基于阻塞队列(blockingqueue)的生产者消费者模型(单消费者单生产者) (4)基于阻塞队列(blockingque ...
- Linux下生产者与消费者模型
1. 概念 有一个或多个生产者生产某种类型的数据,并放在缓冲区里(生产者),有一个消费者从缓冲区中取数据,每次取一项(消费者).系统保证任何时候只有一个主题可以访问缓存区.所以当生产满时,生产者不 ...
- 【Linux】生产者与消费者模型、信号量、死锁
目录 死锁 死锁的产生场景 死锁的gdb分析 1.通过调试可执行程序来分析 2.通过调试正在运行的程序 死锁的必要条件 死锁的预防 生产者与消费者模型 123规则 应用场景及特点 代码实现: 信号量 ...
- 并发无锁队列学习(单生产者单消费者模型)
1.引言 本文介绍单生产者单消费者模型的队列.依据写入队列的内容是定长还是变长,分为单生产者单消费者定长队列和单生产者单消费者变长队列两种. 单生产者单消费者模型的队列操作过程是不须要进行加锁的.生产 ...
最新文章
- wampserver修改mysql用户密码
- clickhouse一键登陆
- java单例注册表_Java单例模式(Singleton)
- 神经网络与深度学习——TensorFlow2.0实战(笔记)(五)(NumPy科学计算库<2>python)
- 第三只眼使用局域网版本还是网络版好_让汽车的“第三只眼”更聪明更安全
- 林期苏曼属性标签编辑_标签打印软件如何打印指定日期
- github的使用 sourceTree
- MySQL卸载不干净问题,connector net卸不掉
- 微信ubuntu版服务器,Ubuntu 18.04 安装微信(Linux通用)
- Unity下落式音游实现——(5)根据音乐生成滑块
- ps的切片用来转换html,Photoshop切片导出HTML+CSS
- 水果店怎么搞活动方案,水果店促销活动方案
- 局域网搭建IOS应用在线安装环境
- “apt-get update”命令
- Nand Flash管理算法介绍之FTL简介
- 绘图软件origin使用总结
- java 线程 组成_java多线程
- 子类方法中super.父类方法
- 智慧民航新业态崭露头角,图扑数字孪生入局民航飞联网
- VMware虚拟机安装windows2008系统
热门文章
- android 日历日期,android 日历 开始日期 与结束日期
- html 的css骚操作,意想不到的 CSS 伪元素 before/after 各种骚操作 - 文章教程
- php swoole yaf,swoole和yaf的区别
- python异常处理有什么意义_怎么学好Python异常处理 五种处理机制是什么
- TCP洪水攻击(SYN Flood)的诊断和处理
- 基于JAVA+SpringMVC+MYSQL的学生信息管理系统
- 基于JAVA+SpringMVC+Mybatis+MYSQL的精美酒店管理系统
- [******] 堆排序
- JavaScript--动态添加元素
- Eclipse+Tomcat+MAVEN+SVN项目完整环境搭建