生产者和消费者(linux)
生产者和消费者
- 同步
- 条件变量
- 条件变量的接口
- 简单实现一个生产者和一个消费者的模型
- pthread_cond_wait
- 该接口为啥要使用互斥锁
- 等待接口的调用互斥锁的逻辑
- 等待接口的逻辑中,为啥要把调用者的PCB放到等待队列后才释放互斥锁
- 多生产者多消费者模型
- 源码
同步
同步保证了每个执行流可以合理的访问每个临界资源
- 同步的作用
当存在临界资源的时候,就直接获取临界资源;没有资源的时候,进行线程等待,等待其他线程让出临界资源,然后再通知等待线程。
等待线程之间再进行抢占式执行。
条件变量
当一个线程互斥的访问一个变量的时候,可能会出现在其他线程改变状态之前,他自己只能进行等待状态。
如果一个线程的任务是访问一个队列,当他发现这个队列是空的时候,他只能进行等待。等到其他线程将一个节点加入到这个队列中,然后通知等待队列(信号通知),这就要用到条件变量。
- 实现原理
PCB等待队列 + 2个接口(等待接口,唤醒接口)
- 执行流程
1.先判断当前线程有无资源可以利用,没有资源就进行调用等待接口进行线程等待,然后这个等待的线程就会被放在PCB等待队列中挂起等待,直到有其他线程来唤醒PCB等待队列中的线程。
2.当一个执行流生产了一个资源之后,调用唤醒接口。该唤醒接口的执行流就会通知PCB等待队列中被挂起的执行流,通知他们可以去访问临界资源,这些执行流彼此之间再进行抢占式执行
条件变量的接口
1.条件变量的定义
pthread_cond_t
结构体
2.初始化条件变量
#include <pthread.h>
//动态初始化 -- 需要调用销毁接口
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);
- cond:传入条件变量的变量地址
- attr:条件变量的属性,一般设置为NULL,让操作系统来默认分配
//静态初始化 -- 不需要调用销毁接口
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
3.等待接口
#include <pthread.h>
//定时等待
int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);
//一直等待
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
- cond:传入条件变量的变量地址
- mutex:传入互斥锁变量的地址
4.唤醒接口
#include <pthread.h>
//唤醒PCB等待队列中所有执行流
int pthread_cond_broadcast(pthread_cond_t *cond);
//唤醒PCB等待队列中至少一个执行流
int pthread_cond_signal(pthread_cond_t *cond);
- cond:传入条件变量的变量地址
5.销毁条件变量
#include <pthread.h>int pthread_cond_destroy(pthread_cond_t *cond);
可以释放掉添加变量的内存,防止内存泄漏。
简单实现一个生产者和一个消费者的模型
程序所需要的变量和函数
- 临界资源:g_resource
- 一把互斥锁 lock,一个临界资源 cond
- 生产者线程 tid[1],消费者线程 tid[0]
- 生产者线程任务函数 ProductStart(),消费者线程任务函数 ConsumeStart()
流程图:
代码:
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>//0 -- 当前没有临界资源 1 -- 当前有临界资源
int g_resource = 0; //定义一个临界资源 //定义互斥锁和条件变量
pthread_mutex_t lock;//互斥锁
pthread_cond_t cond;//条件变量
//消费者线程任务
void* ConsumeStart(void* arg)
{(void*)arg;//让消费者一直消费 while(1){//上锁,满足互斥条件,同一时间,一个临界资源只能被生成或者消费 pthread_mutex_lock(&lock);if(g_resource == 0)//如果没有临界资源 {//进行阻塞等待,阻塞等待的过程//1.把该线程的PCB放到PCB等待队列中 //2.取消该线程加的锁//3.等待进程被唤醒pthread_cond_wait(&cond,&lock);}sleep(1);g_resource--;//进行消费,每次-1 printf("i am consum pthread, now g_resource = [%d]\n",g_resource);pthread_mutex_unlock(&lock);//解锁 pthread_cond_signal(&cond);//通知生产者线程可以生产资源了 }return NULL;
}
//生产者线程任务
void* ProductStart(void* arg)
{(void*)arg;while(1)//生产者线程一直生产 {pthread_mutex_lock(&lock);//互斥条件,上锁 if(g_resource == 1)//如果有资源,生产者线程进入等待过程 {pthread_cond_wait(&cond,&lock);}sleep(1);g_resource++; //进行生产,每次+1 printf("i am Product thread, now g_resource = [%d]\n",g_resource);pthread_mutex_unlock(&lock);pthread_cond_signal(&cond);//通知消费者线程进行消费 }return NULL;
}int main()
{//初始化互斥锁和条件变量pthread_mutex_init(&lock,NULL);pthread_cond_init(&cond,NULL);pthread_t tid[2];//消费者线程int ret = pthread_create(&tid[0],NULL,ConsumeStart,NULL);if(ret < 0){perror("Consume pthread_create error");return 0;}//生产者线程 ret = pthread_create(&tid[1],NULL,ProductStart,NULL);if(ret < 0){perror("Product pthread_create error");return 0;}int i = 0;//两个线程进行等待 for(i = 0; i < 2; i++){pthread_join(tid[i],NULL);}//销毁互斥锁和条件变量 pthread_mutex_destroy(&lock);pthread_cond_destroy(&cond);return 0;
}
运行结果
可以发现是生产者生产一个资源,消费者就消费一个资源,一个生产者和一个消费者的模型就简单构造完毕了。
注意
- 模拟的资源是一个临界资源,在访问的时候需要满足互斥条件,就需要在访问的时候进行加锁和解锁,从而满足互斥。
- 如果不满足消费或者生产的条件的时候,就需要让这个线程进行等待,调用 pthread_cond_wait(&cond,&lock); 函数。在这个函数内部,会先把这个线程加入到PCB等待队列中,再对互斥锁进行解锁操作以便另一个线程可以互斥的访问临界资源,在解锁之后这个线程就陷入等待状态了。一直到该线程被唤醒后利用保存的程序计数器和上下文信息来恢复到之前的状态,再继续执行。
- 在生产和消费操作(+1,-1)之后,该线程的任务就是唤醒PCB等待队列中的另一个线程,让他进行消费或者生产操作,防止线程阻塞。如果生产者线程在生产完之后,不通知PCB等待队列去唤醒这个消费者线程,那么由于线程之间的抢占式执行,可能消费者线程此时还在等待状态(等待生产者生产资源),而生产者此时已经生产完资源,在他的想法中,已经有了资源,自己就等待消费了。此时生产者等待消费,消费者等待生产,就陷入了一个阻塞状态。
pthread_cond_wait
函数接口
pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex)
该接口为啥要使用互斥锁
等待接口的作用就是让当前执行流进行等待,然后让其他执行流去访问临界资源,而临界资源在访问的时候,需要同时满足同步和互斥条件,否则就会出现二义性。
如果不使用互斥锁而单使用一个条件变量,也就是不满足互斥,只满足了同步,相当于不同的执行流在同一时刻可以访问临界资源。
所以说,在条件变量等待接口中需要用互斥锁来保证互斥属性,从而保证其他执行流在同一时刻访问临界资源的时候,只有一个执行流在访问临界资源。
等待接口的调用互斥锁的逻辑
- 将调用者线程的PCB放到PCB等待队列中
- 对该互斥锁进行解锁操作
- 等待其他线程再调用完临界资源后通知PCB等待队列,从而唤醒该执行流(唤醒执行流的情况需要看调用的是哪一个唤醒接口),把PCB等待队列中执行流移出PCB等待队列,然后进行抢锁操作。
这里又有两种情况:(无论抢没抢到互斥锁,执行流都已经移出了PCB等待队列)
- 没有抢到互斥锁:这个执行流如果没有抢到互斥锁,就会卡在互斥锁的逻辑当中,也就是还是陷入等待状态。
当他再次获取时间片的时候(下一次可以抢互斥锁时),就会通过上下文信息和程序计数器中恢复上一次执行时的场景,继续进行强互斥锁的逻辑,直到加锁成功,从等待接口返回。 - 抢到了互斥锁:这个执行流如果抢到了互斥锁,就拥有了重新访问该临界资源的资格,可以直接从等待接口返回,然后顺序执行代码的逻辑。
等待接口的逻辑中,为啥要把调用者的PCB放到等待队列后才释放互斥锁
这是为了防止消费者线程先释放互斥锁后,被生产者线程抢在消费者线程进入PCB等待队列之前拿到互斥锁,并且执行生产逻辑后通知PCB等待队列。这个时候,消费者线程才进入PCB等待队列中,就可能发生没有生产者线程调用唤醒接口来唤醒PCB等待队列,因为此时已经生产了资源,而发生的阻塞现象。
多生产者多消费者模型
先将上面的程序,在主函数中分别增加一个生产者线程和一个消费者线程。
程序此时所需要的变量和函数
- 临界资源:g_resource
- 一把互斥锁 lock,一个临界资源 cond
- 生产者线程 Producer_tid[],消费者线程 Consum_tid[],数组的大小都是2,THREADPRODUCER和THREADCONSUM 这两个宏来表示2
- 生产者线程任务函数 ProductStart(),消费者线程任务函数 ConsumeStart()
主函数部分
pthread_t Consum_tid[THREADCONSUM];//消费者线程pthread_t Producer_tid[THREADPRODUCE];//生产者线程int ret = 0;int i = 0;for(i = 0; i < THREADCONSUM; i++){//消费者线程ret = pthread_create(&Consum_tid[i],NULL,ConsumeStart,NULL);if(ret < 0){perror("Consume pthread_create error");return 0;}}for(i = 0; i < THREADPRODUCE; i++){//生产者者线程ret = pthread_create(&Producer_tid[i],NULL,ProductStart,NULL);if(ret < 0){perror("Product pthread_create error");return 0;}}
仅仅这样改变,就出现了不是同步访问资源的情况,消费者访问了没有的资源,很不合理。
原因:
- 两个消费者执行流先都阻塞在了等待接口,然后等到生产者线程生产完毕调用唤醒接口的时候,AB两个线程就从PCB等待队列中被释放。
- 消费者线程之前互相抢互斥锁,一个抢到,一个没抢到。假设A抢到了,然后顺序执行,最后在他释放锁资源的时候,B就拥有了时间片,然后抢到了互斥锁,继续顺序执行,此时就访问了不合理的资源
解决方案
把判断条件改为循环判断-- while
改掉之后发现问题还没有解决,程序会一直陷入等待的状态。
调用ps aux 和 pstack查看后发现线程都阻塞了
和上面的问题相比,这个现象就是多走了那一步。
- 首先,消费者A和消费者B都陷入了PCB等待队列中,然后由生产者进行生产资源,生产完毕后打印第一行数据,已经生产一个资源。
- 其次,消费者A和B被唤醒,开始抢这个互斥锁。假如抢到的是A,那么B就陷入等待中,但是此时消费者B是不在PCB等待队列中的。A顺序执行,打印消费的第二行。
- 之后A释放了锁资源,要拥有锁资源的线程开始抢占互斥锁,看打印的情况,此时应该是生产者抢到了互斥锁,然后生产资源,打印第三行。A线程释放之后,再次进入这个循环,还可以陷入等待,然后进入PCB等待队列中
- 最后释放锁资源的时候,应该是消费者抢到了这个互斥锁。此时再次判断有没有资源,发现没有,就陷入了等待逻辑。此时生产者AB,消费者AB(在PCB等待队列中,因为没有接到通知,所以他们认为当前是有资源的)都陷入了等待逻辑,程序卡死。
解决方法
自己调用自己的PCB等待队列,生产者用生产者PCB等待队列,消费者用消费者PCB等待队列
这样做法的好处
可以规避掉消费者消费完成之后,通知PCB等待队列的时候,唤醒的同样是一个消费者,导致所有线程都被卡死在等待的逻辑中。
这时生产者线程也接收不到消费者线程的唤醒接口,从而所有线程都卡死在了等待接口。
完善后打印的结果
源码
https://github.com/duchenlong/linux-text/blob/master/thread/conds.c
生产者和消费者(linux)相关推荐
- 多生产者多消费者linux内核,Linux多线程实现“生产者和消费者”
frankzfz2014-07-27 17:32 demo121:frankzfz您好: 我想请教一个问题,就是将写好的GenericApp项目(没有配置工具),我加入zigbee协议栈的配置工具后还 ...
- linux进程间通信:system V 信号量 生产者和消费者模型编程案例
生产者和消费者模型: 有若干个缓冲区,生产者不断向里填数据,消费者不断从中取数据 两者不冲突的前提: 缓冲区有若干个,且是固定大小,生产者和消费者各有若干个 生产者向缓冲区中填数据前需要判断缓冲区是否 ...
- Linux系统编程:使用semaphore信号量和mutex互斥量实现多个生产者和消费者模型
代码实现 如题,使用semaphore信号量和mutex互斥量实现多个生产者和消费者模型.本来是想只用信号量实现生产者消费者模型的,但是发现 只能在一个生产者和一个消费者之间,要在多个生产者和消费者模 ...
- Linux并发程序课程设计报告,网络操作系统课程设计--进程机制与并发程序设计-linux下生产者与消费者的问题实现.doc...
网 络 操 作 系 统 课 程 设 计 网络操作系统课程设计 设计内容:进程机制与并发程序设计inux下生产者与消费者的问题实现进程机制与并发程序设计inux下生产者与消费者的问题实现 (1)掌握基本 ...
- Linux 生产者与消费者模型C++实现
生产者与消费者模型 本篇博客代码实现都是在linux环境下跑的 通过条件变量实现 应用场景:针对大量数据的产生与处理的场景 生产与处理是放到不同执行流中完成的,中间会增加一个数据缓冲区,作为中间的数据 ...
- linux使用线程实现生产者消费者问题,Linux下生产者与消费者的线程实现
代码见<现代操作系统> 第3版. 为了显示效果,添加了printf()函数来显示运行效果 #include #include #define MAX 20 pthread_mutex_t ...
- Linux信号量与互斥锁解决生产者与消费者问题
先来看什么是生产者消费者问题: 生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问 ...
- Linux操作系统实验:生产者和消费者问题
一.实验目的及要求 "生产者消费者"问题是一个著名的同时性编程问题的集合.通过编写经典的"生产者消费者"问题的实验,读者可以进一步熟悉 Linux 中多线程编程 ...
- 【Linux下】 线程同步 生产者与消费者模型
文章目录 [Linux下] 线程同步 生产者与消费者模型 线程同步 同步概念与竞态条件 条件变量 条件变量本质 操作条件变量 初始化和销毁条件变量 等待 唤醒 通过条件变量实现的简单线程同步例子 为什 ...
最新文章
- leetcode 448. Find All Numbers Disappeared in an Array
- 嵌入式大牛常用的十大C/C++开发利器
- LNMT/LAMT实现动静分离、负载均衡和会话保持
- vue 方法回调通知执行下一个方法
- Gitlab文件管理之把文档传到指定文件夹
- 成都哪所专科院校有计算机专业,成都哪些高职院校有计算机应用技术
- 仿苹果 底部弹窗 选择列表
- 2022邮件群发软件哪个好,群发邮件软件推荐
- MaxKey单点登录认证系统微服务架构v3.0.0GA发布
- 这才叫高颜值的Markdown编辑神器!
- 智能管家App kotlin版——开发索引
- 北风修仙笔记—2020年3月
- 2023年湖北黄石初级工程师职称在哪里报名?评审条件是什么启程别
- java过滤_java 过滤list的几种方式
- 【4022】有些KPI的完成,╮(╯▽╰)╭
- 图像处理用什么神经网络,神经网络图像处理
- 新一届CMO获奖名单公布:人大附中、上海中学霸榜,深圳中学选手夺魁
- USRP B210同步采集
- Kali渗透测试之端口扫描1——UDP、TCP、僵尸扫描、隐蔽扫描
- 陌陌走向全面衰退真怪不得疫情