生产者与消费者模型

本篇博客代码实现都是在linux环境下跑的

通过条件变量实现

应用场景:针对大量数据的产生与处理的场景

生产与处理是放到不同执行流中完成的,中间会增加一个数据缓冲区,作为中间的数据缓冲场所,例如如果生成速度比处理数据速度快,那就可以将生成了的数据放到该缓冲区中,消费者只管从缓冲区取数据就行,这样子就可以提高它们的工作效率


生产者与消费者模型的优势:
1、降低生成者与消费者之间的耦合度
2、支持忙闲不均,生成者生成速度快,不一样要等到有空闲的消费者,只要将数据放到缓冲区即可,等待消费者去处理
3、支持并发,多个生产者线程和多个消费者同时刻执行自己的任务

生产者与消费者模型的实现
实现该模型的重点难点其实是在实现一个线程安全的缓冲队列

我们模拟一个缓冲队列最大只能放下5个数据,当数据满时,生产者等待,并唤醒消费者,当没有数据时,消费者等待,并唤醒生产者。

队列我们可以使用STL库中的queue容器,但是该容器是非线程安全的,这时候我们就必须自己添加有关信息变量来控制队列,定义一个容量上限capacity,防止内存耗尽导致程序崩溃;定义一个互斥变量mutex,保证资源的安全性;定义两个条件变量productor_condcustomer_cond,保证访问资源的合理性。在保证线程安全的队列的前提下,我们还得向外提供出队与入队的操作push()生成和pop()消费。

代码实现

#include <iostream>
#include <stdio.h>
#include <queue>
#include <pthread.h>
using namespace std;//线程安全的缓冲队列
#define QUEUE_MAX 5
class BlockQueue
{public:BlockQueue(int maxq = QUEUE_MAX):_capacity(maxq){pthread_mutex_init(&_mutex, NULL);pthread_cond_init(&_pro_cond, NULL);pthread_cond_init(&_cus_cond, NULL);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_pro_cond);pthread_cond_destroy(&_cus_cond);}bool push(const int& data){//生产者将数据入队,若数据满了需要阻塞pthread_mutex_lock(&_mutex);while (_queue.size() == _capacity){pthread_cond_wait(&_pro_cond, &_mutex);}//将数据入队_queue.push(data);//解锁pthread_mutex_unlock(&_mutex);//唤醒消费者线程pthread_cond_signal(&_cus_cond);return true;}bool pop(int *data){//消费者将数据出队,若没有数据需要阻塞pthread_mutex_lock(&_mutex);while (_queue.empty()){pthread_cond_wait(&_cus_cond, &_mutex);}//获取队头元素*data = _queue.front();//将数据出队_queue.pop();//解锁pthread_mutex_unlock(&_mutex);//唤醒生产者线程pthread_cond_signal(&_pro_cond);return true;}
private:queue<int> _queue;//简单队列int _capacity;//最大节点数量pthread_mutex_t _mutex;//互斥变量pthread_cond_t _pro_cond;//生产者条件变量pthread_cond_t _cus_cond;//消费者条件变量
};void *thr_productor(void* arg )
{BlockQueue *queue = (BlockQueue*)arg;int i = 0;while (1){//生产者生成数据queue->push(i);printf("productor push data:%d\n", i++);}return NULL;
}
void *thr_customer(void* arg )
{BlockQueue *queue = (BlockQueue*)arg;while (1){//消费者不断获取数据int data;queue->pop(&data);printf("customer pop data:%d\n", data);}return NULL;
}int main()
{int ret, i;pthread_t ptid[4], ctid[4];BlockQueue queue;for (i = 0; i < 4; ++i){ret = pthread_create(&ptid[i], NULL, thr_productor, (void*)&queue);if (ret != 0){printf("create productor thread error\n");return -1;}ret = pthread_create(&ctid[i], NULL, thr_customer, (void*)&queue);if (ret != 0){printf("create productor thread error\n");return -1;}}for (i = 0; i < 4; i++){pthread_join(ptid[i], NULL);pthread_join(ctid[i], NULL);}return 0;
}

运行结果:

通过信号量实现

信号量可以用于实现线程或者进程同步与互斥(主要用于同步)
信号量 = 一个计数器 + pcb等待队列
同步原理:通过自身计数器对资源进行计数,并通过计数器的资源计数,判断进程/线程是否能够符合访问资源的条件,若符合就可以访问,若不符合则调用提供的接口使进程/线程阻塞;其他进程/线程促使条件满足之后,可以唤醒pcb等待队列上的进程/线程

互斥原理:保证计数器的计数不大于1,就保证了资源只有一个,并且同一时间只能被一个进程/线程访问

操作流程
1、定义信号量 sem_t sem
2、初始化信号量int sem_init(sem_t *sem, int pshared, unsigned int value) 参数内容(sem:我们定义的信号量;pshared:标识该信号量用于进程还是线程,0表示用于线程间,非0表示用于进程间;value:初始化信号量,初识资源数量有多少该值就为多少) 返回值:成功返回0,失败返回-1
3、在访问临界资源之前,先访问信号量,判断是否能够访问,计数-1。接口1int sem_wait(sem_t *sem) 通过自身计数判断是否满足访问条件,不满足就一直阻塞;接口2int sem_trywait(sem_t *sem) 通过自身计数判断是否满足访问条件,不满足就报错返回;接口3int sem_timewait(sem_t *sem, const struct timespec *abs_timeout) 通过自身计数判断是否满足访问条件,当不满足就等待指定的时间,超时就报错返回
4、促使访问条件满足,计数+1,唤醒阻塞线程/进程int sem_post(sem_t *sem)
5、销毁信号量 int sem_destroy(sem_t *sem)

通过信号量实现一个生产者与消费者模型
使用vector实现队列

定义一个vector数组,实现一个等待队列,定义_capacity,用于指定队列的最大容量,再定义两个指针_step_read_step_write,用于记录读与写的操作位置。定义三个信号量,_lock用于实现互斥;_sem_idle 用于对空闲空间进行计数,对于生产者来有说空闲空间的时候才能写入数据,计数>0,初始化为最大容量;_sem_data 用于对具有数据的空间进行计数,对于消费者来说有数据的空间才能取数据,计数>0,初始化为0。

代码实现:

  1 #include <cstdio>2 #include <iostream>3 #include <vector>4 #include <pthread.h>5 #include <semaphore.h>6 using namespace std;7 8 #define QUEUE_MAX 59 class RingQueue10 {11 public:12     RingQueue(int maxq = QUEUE_MAX)13         :_queue(maxq)14         ,_capacity(maxq)15         ,_step_read(0)16         ,_step_write(0)17     {18         sem_init(&_lock, 0, 1);//实现互斥锁19         sem_init(&_sem_data, 0, 0);//数据空间初始化为020         sem_init(&_sem_idle, 0, maxq);//空闲空间初始化为maxq21     }22     ~RingQueue()23     {24         sem_destroy(&_lock);25         sem_destroy(&_sem_data);26         sem_destroy(&_sem_idle);27     }28     bool push(const int& data)29     {//先判断能否访问在加锁这个顺序不能乱,如果相反了,加锁成功后发现没有空闲结点,就会一直阻塞30         //1、判断是否能访问,不能就阻塞--空闲空间计数的判断,能访问空闲空间计数-131         sem_wait(&_sem_idle);32         //2、能访问就加锁,保护访问过程33         sem_wait(&_lock);//计数不能大于1,-1为可访问34         //3、资源的访问35         _queue[_step_write] = data;36         _step_write = (_step_write + 1) % _capacity;37         //4、解锁38         sem_post(&_lock);//lock计数+1,唤醒因加锁而阻塞的线程39         //5、入队数据之后,数据空间计数+1,唤醒消费者40         sem_post(&_sem_data);41         return true;42     }43     bool pop(int *data)44     {45         sem_wait(&_sem_data);//判断数据空间是否能访问46         sem_wait(&_lock);//有数据则加锁保护访问数据的过程47         *data = _queue[_step_read];48         _step_read = (_step_read + 1) % _capacity;49         sem_post(&_lock);//解锁50         sem_post(&_sem_idle);//唤醒生产者51         return true;52     }53 private:54     vector<int> _queue;55     int _capacity;56     int _step_read;57     int _step_write;58 59     sem_t _lock;60     sem_t _sem_data;61     sem_t _sem_idle;62 };63 void *thr_productor(void *arg)64 {65     RingQueue *queue = (RingQueue*)arg;66     int i = 0;67     while (1)68     {69         queue->push(i);70         printf("productor push data:%d\n", i++);71     }72     return NULL;73 }74 75 76 void *thr_customer(void *arg)77 {78     RingQueue *queue = (RingQueue*)arg;79     while (1)80     {81         int data;82         queue->pop(&data);83         printf("customer pop data:%d\n", data);84     }85     return NULL;86 }87 88 int main()89 {90     int ret, i;91     pthread_t ptid[4], ctid[4];92     RingQueue queue;93 94     for (i = 0; i < 4; ++i)95     {96         ret = pthread_create(&ptid[i], NULL, thr_productor, (void*)&queue);97         if (ret != 0)98         {99             printf("create productor thread error\n");
100             return -1;
101         }
102         ret = pthread_create(&ctid[i], NULL, thr_customer, (void*)&queue);
103         if (ret != 0)
104         {105             printf("create productor thread error\n");
106             return -1;
107         }
108     }
109     for (i = 0; i < 4; ++i)
110     {111         pthread_join(ptid[i], NULL);
112         pthread_join(ctid[i], NULL);
113     }
114     return 0;
115 }

运行结果:

Linux 生产者与消费者模型C++实现相关推荐

  1. linux进程间通信:system V 信号量 生产者和消费者模型编程案例

    生产者和消费者模型: 有若干个缓冲区,生产者不断向里填数据,消费者不断从中取数据 两者不冲突的前提: 缓冲区有若干个,且是固定大小,生产者和消费者各有若干个 生产者向缓冲区中填数据前需要判断缓冲区是否 ...

  2. Linux系统编程:使用semaphore信号量和mutex互斥量实现多个生产者和消费者模型

    代码实现 如题,使用semaphore信号量和mutex互斥量实现多个生产者和消费者模型.本来是想只用信号量实现生产者消费者模型的,但是发现 只能在一个生产者和一个消费者之间,要在多个生产者和消费者模 ...

  3. 【Linux下】 线程同步 生产者与消费者模型

    文章目录 [Linux下] 线程同步 生产者与消费者模型 线程同步 同步概念与竞态条件 条件变量 条件变量本质 操作条件变量 初始化和销毁条件变量 等待 唤醒 通过条件变量实现的简单线程同步例子 为什 ...

  4. Linux系统编程---17(条件变量及其函数,生产者消费者条件变量模型,生产者与消费者模型(线程安全队列),条件变量优点,信号量及其主要函数,信号量与条件变量的区别,)

    条件变量 条件变量本身不是锁!但它也可以造成线程阻塞.通常与互斥锁配合使用.给多线程提供一个会合的场所. 主要应用函数: pthread_cond_init 函数 pthread_cond_destr ...

  5. Linux系统编程40:多线程之基于环形队列的生产者与消费者模型

    文章目录 (1)什么是信号量 (2)与信号量相关的操作 (3)基于环形队列的生产者与消费者模型-信号量(单消费者单生产者) (1)什么是信号量 前面的叙述中,我们通过锁保证了每次只有一个线程进入临界区 ...

  6. Linux系统编程39:多线程之基于阻塞队列生产者与消费者模型

    文章目录 (1)生产者与消费者模型概述 (2)生产者与消费者模型优点 (3)基于阻塞队列(blockingqueue)的生产者消费者模型(单消费者单生产者) (4)基于阻塞队列(blockingque ...

  7. Linux下生产者与消费者模型

    1. 概念   有一个或多个生产者生产某种类型的数据,并放在缓冲区里(生产者),有一个消费者从缓冲区中取数据,每次取一项(消费者).系统保证任何时候只有一个主题可以访问缓存区.所以当生产满时,生产者不 ...

  8. 【Linux】生产者与消费者模型、信号量、死锁

    目录 死锁 死锁的产生场景 死锁的gdb分析 1.通过调试可执行程序来分析 2.通过调试正在运行的程序 死锁的必要条件 死锁的预防 生产者与消费者模型 123规则 应用场景及特点 代码实现: 信号量 ...

  9. 并发无锁队列学习(单生产者单消费者模型)

    1.引言 本文介绍单生产者单消费者模型的队列.依据写入队列的内容是定长还是变长,分为单生产者单消费者定长队列和单生产者单消费者变长队列两种. 单生产者单消费者模型的队列操作过程是不须要进行加锁的.生产 ...

最新文章

  1. wampserver修改mysql用户密码
  2. clickhouse一键登陆
  3. java单例注册表_Java单例模式(Singleton)
  4. 神经网络与深度学习——TensorFlow2.0实战(笔记)(五)(NumPy科学计算库<2>python)
  5. 第三只眼使用局域网版本还是网络版好_让汽车的“第三只眼”更聪明更安全
  6. 林期苏曼属性标签编辑_标签打印软件如何打印指定日期
  7. github的使用 sourceTree
  8. MySQL卸载不干净问题,connector net卸不掉
  9. 微信ubuntu版服务器,Ubuntu 18.04 安装微信(Linux通用)
  10. Unity下落式音游实现——(5)根据音乐生成滑块
  11. ps的切片用来转换html,Photoshop切片导出HTML+CSS
  12. 水果店怎么搞活动方案,水果店促销活动方案
  13. 局域网搭建IOS应用在线安装环境
  14. “apt-get update”命令
  15. Nand Flash管理算法介绍之FTL简介
  16. 绘图软件origin使用总结
  17. java 线程 组成_java多线程
  18. 子类方法中super.父类方法
  19. 智慧民航新业态崭露头角,图扑数字孪生入局民航飞联网
  20. VMware虚拟机安装windows2008系统

热门文章

  1. android 日历日期,android 日历 开始日期 与结束日期
  2. html 的css骚操作,意想不到的 CSS 伪元素 before/after 各种骚操作 - 文章教程
  3. php swoole yaf,swoole和yaf的区别
  4. python异常处理有什么意义_怎么学好Python异常处理 五种处理机制是什么
  5. TCP洪水攻击(SYN Flood)的诊断和处理
  6. 基于JAVA+SpringMVC+MYSQL的学生信息管理系统
  7. 基于JAVA+SpringMVC+Mybatis+MYSQL的精美酒店管理系统
  8. [******] 堆排序
  9. JavaScript--动态添加元素
  10. Eclipse+Tomcat+MAVEN+SVN项目完整环境搭建