文章目录

  • 生产者消费者模型
    • 生产者消费者模型的概念
    • 生产者消费者模型的特点
    • 生产者消费者模型优点
  • 基于BlockingQueue的生产者消费者模型
    • 基于阻塞队列的生产者消费者模型
    • 模拟实现基于阻塞队列的生产消费模型

生产者消费者模型

生产者消费者模型的概念

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。

生产者和消费者彼此之间不直接通讯,而通过这个容器来通讯,所以生产者生产完数据之后不用等待消费者处理,直接将生产的数据放到这个容器当中,消费者也不用找生产者要数据,而是直接从这个容器里取数据,这个容器就相当于一个缓冲区,平衡了生产者和消费者的处理能力,这个容器实际上就是用来给生产者和消费者解耦的。

生产者消费者模型的特点

生产者消费者模型是多线程同步与互斥的一个经典场景,其特点如下:

  • 三种关系: 生产者和生产者(互斥关系)、消费者和消费者(互斥关系)、生产者和消费者(互斥关系、同步关系)。
  • 两种角色: 生产者和消费者。(通常由进程或线程承担)
  • 一个交易场所: 通常指的是内存中的一段缓冲区。(可以自己通过某种方式组织起来)

我们用代码编写生产者消费者模型的时候,本质就是对这三个特点进行维护。

生产者和生产者、消费者和消费者、生产者和消费者,它们之间为什么会存在互斥关系?

介于生产者和消费者之间的容器可能会被多个执行流同时访问,因此我们需要将该临界资源用互斥锁保护起来。

其中,所有的生产者和消费者都会竞争式的申请锁,因此生产者和生产者、消费者和消费者、生产者和消费者之间都存在互斥关系。

生产者和消费者之间为什么会存在同步关系?

  • 如果让生产者一直生产,那么当生产者生产的数据将容器塞满后,生产者再生产数据就会生产失败。
  • 反之,让消费者一直消费,那么当容器当中的数据被消费完后,消费者再进行消费就会消费失败。

虽然这样不会造成任何数据不一致的问题,但是这样会引起另一方的饥饿问题,是非常低效的。我们应该让生产者和消费者访问该容器时具有一定的顺序性,比如让生产者先生产,然后再让消费者进行消费。

注意: 互斥关系保证的是数据的正确性,而同步关系是为了让多线程之间协同起来。

生产者消费者模型优点

  • 解耦。
  • 支持并发。
  • 支持忙闲不均。

如果我们在主函数中调用某一函数,那么我们必须等该函数体执行完后才继续执行主函数的后续代码,因此函数调用本质上是一种紧耦合。

对应到生产者消费者模型中,函数传参实际上就是生产者生产的过程,而执行函数体实际上就是消费者消费的过程,但生产者只负责生产数据,消费者只负责消费数据,在消费者消费期间生产者可以同时进行生产,因此生产者消费者模型本质是一种松耦合。

基于BlockingQueue的生产者消费者模型

基于阻塞队列的生产者消费者模型

在多线程编程中,阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。


其与普通的队列的区别在于:

  • 当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中放入了元素。
  • 当队列满时,往队列里存放元素的操作会被阻塞,直到有元素从队列中取出。

知识联系: 看到以上阻塞队列的描述,我们很容易想到的就是管道,而阻塞队列最典型的应用场景实际上就是管道的实现。

模拟实现基于阻塞队列的生产消费模型

为了方便理解,下面我们以单生产者、单消费者为例进行实现。

其中的BlockQueue就是生产者消费者模型当中的交易场所,我们可以用C++STL库当中的queue进行实现。

#include <iostream>
#include <pthread.h>
#include <queue>
#include <unistd.h>#define NUM 5template<class T>
class BlockQueue
{private:bool IsFull(){return _q.size() == _cap;}bool IsEmpty(){return _q.empty();}
public:BlockQueue(int cap = NUM): _cap(cap){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_full, nullptr);pthread_cond_init(&_empty, nullptr);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_full);pthread_cond_destroy(&_empty);}//向阻塞队列插入数据(生产者调用)void Push(const T& data){pthread_mutex_lock(&_mutex);while (IsFull()){//不能进行生产,直到阻塞队列可以容纳新的数据pthread_cond_wait(&_full, &_mutex);}_q.push(data);pthread_mutex_unlock(&_mutex);pthread_cond_signal(&_empty); //唤醒在empty条件变量下等待的消费者线程}//从阻塞队列获取数据(消费者调用)void Pop(T& data){pthread_mutex_lock(&_mutex);while (IsEmpty()){//不能进行消费,直到阻塞队列有新的数据pthread_cond_wait(&_empty, &_mutex);}data = _q.front();_q.pop();pthread_mutex_unlock(&_mutex);pthread_cond_signal(&_full); //唤醒在full条件变量下等待的生产者线程}
private:std::queue<T> _q; //阻塞队列int _cap; //阻塞队列最大容器数据个数pthread_mutex_t _mutex;pthread_cond_t _full;pthread_cond_t _empty;
};

相关说明:

  • 由于我们实现的是单生产者、单消费者的生产者消费者模型,因此我们不需要维护生产者和生产者之间的关系,也不需要维护消费者和消费者之间的关系,我们只需要维护生产者和消费者之间的同步与互斥关系即可。
  • 将BlockingQueue当中存储的数据模板化,方便以后需要时进行复用。
  • 这里设置BlockingQueue存储数据的上限为5,当阻塞队列中存储了五组数据时生产者就不能进行生产了,此时生产者就应该被阻塞。
  • 阻塞队列是会被生产者和消费者同时访问的临界资源,因此我们需要用一把互斥锁将其保护起来。
  • 生产者线程要向阻塞队列当中Push数据,前提是阻塞队列里面有空间,若阻塞队列已经满了,那么此时该生产者线程就需要进行等待,直到阻塞队列中有空间时再将其唤醒。
  • 消费者线程要从阻塞队列当中Pop数据,前提是阻塞队列里面有数据,若阻塞队列为空,那么此时该消费者线程就需要进行等待,直到阻塞队列中有新的数据时再将其唤醒。
  • 因此在这里我们需要用到两个条件变量,一个条件变量用来描述队列为空,另一个条件变量用来描述队列已满。当阻塞队列满了的时候,要进行生产的生产者线程就应该在full条件变量下进行等待;当阻塞队列为空的时候,要进行消费的消费者线程就应该在empty条件变量下进行等待。
  • 不论是生产者线程还是消费者线程,它们都是先申请到锁进入临界区后再判断是否满足生产或消费条件的,如果对应条件不满足,那么对应线程就会被挂起。但此时该线程是拿着锁的,为了避免死锁问题,在调用pthread_cond_wait函数时就需要传入当前线程手中的互斥锁,此时当该线程被挂起时就会自动释放手中的互斥锁,而当该线程被唤醒时又会自动获取到该互斥锁。
  • 当生产者生产完一个数据后,意味着阻塞队列当中至少有一个数据,而此时可能有消费者线程正在empty条件变量下进行等待,因此当生产者生产完数据后需要唤醒在empty条件变量下等待的消费者线程。
  • 同样的,当消费者消费完一个数据后,意味着阻塞队列当中至少有一个空间,而此时可能有生产者线程正在full条件变量下进行等待,因此当消费者消费完数据后需要唤醒在full条件变量下等待的生产者线程。

判断是否满足生产消费条件时不能用if,而应该用while:

  • pthread_cond_wait函数是让当前执行流进行等待的函数,是函数就意味着有可能调用失败,调用失败后该执行流就会继续往后执行。
  • 其次,在多消费者的情况下,当生产者生产了一个数据后如果使用pthread_cond_broadcast函数唤醒消费者,就会一次性唤醒多个消费者,但待消费的数据只有一个,此时其他消费者就被伪唤醒了。
  • 为了避免出现上述情况,我们就要让线程被唤醒后再次进行判断,确认是否真的满足生产消费条件,因此这里必须要用while进行判断。

在主函数中我们就只需要创建一个生产者线程和一个消费者线程,让生产者线程不断生产数据,让消费者线程不断消费数据。

#include "BlockQueue.hpp"void* Producer(void* arg)
{BlockQueue<int>* bq = (BlockQueue<int>*)arg;//生产者不断进行生产while (true){sleep(1);int data = rand() % 100 + 1;bq->Push(data); //生产数据std::cout << "Producer: " << data << std::endl;}
}
void* Consumer(void* arg)
{BlockQueue<int>* bq = (BlockQueue<int>*)arg;//消费者不断进行消费while (true){sleep(1);int data = 0;bq->Pop(data); //消费数据std::cout << "Consumer: " << data << std::endl;}
}
int main()
{srand((unsigned int)time(nullptr));pthread_t producer, consumer;BlockQueue<int>* bq = new BlockQueue<int>;//创建生产者线程和消费者线程pthread_create(&producer, nullptr, Producer, bq);pthread_create(&consumer, nullptr, Consumer, bq);//join生产者线程和消费者线程pthread_join(producer, nullptr);pthread_join(consumer, nullptr);delete bqreturn 0;
}

相关说明:

  • 阻塞队列要让生产者线程向队列中Push数据,让消费者线程从队列中Pop数据,因此这个阻塞队列必须要让这两个线程同时看到,所以我们在创建生产者线程和消费者线程时,需要将该阻塞队列作为线程执行例程的参数进行传入。
  • 代码中生产者生产数据就是将获取到的随机数Push到阻塞队列,而消费者消费数据就是从阻塞队列Pop数据,为了便于观察,我们可以将生产者生产的数据和消费者消费的数据进行打印输出。

生产者消费者步调一致

由于代码中生产者是每隔一秒生产一个数据,而消费者是每隔一秒消费一个数据,因此运行代码后我们可以看到生产者和消费者的执行步调是一致的。

小贴士:.hpp为后缀的文件也是头文件,该头文件同时包含类的定义与实现,调用者只需include该hpp文件即可。因为开源项目一般不需要进行保护,所以在开源项目中用的比较多。

生产者生产的快,消费者消费的慢

我们可以让生产者不停的进行生产,而消费者每隔一秒进行消费。

void* Producer(void* arg)
{BlockQueue<int>* bq = (BlockQueue<int>*)arg;//生产者不断进行生产while (true){int data = rand() % 100 + 1;bq->Push(data); //生产数据std::cout << "Producer: " << data << std::endl;}
}
void* Consumer(void* arg)
{BlockQueue<int>* bq = (BlockQueue<int>*)arg;//消费者不断进行消费while (true){sleep(1);int data = 0;bq->Pop(data); //消费数据std::cout << "Consumer: " << data << std::endl;}
}

此时由于生产者生产的很快,运行代码后一瞬间生产者就将阻塞队列打满了,此时生产者想要再进行生产就只能在full条件变量下进行等待,直到消费者消费完一个数据后,生产者才会被唤醒进而继续进行生产,生产者生产完一个数据后又会进行等待,因此后续生产者和消费者的步调又变成一致的了。

生产者生产的慢,消费者消费的快

当然,我们也可以让生产者每隔一秒进行生产,而消费者不停的进行消费。

void* Producer(void* arg)
{BlockQueue<int>* bq = (BlockQueue<int>*)arg;//生产者不断进行生产while (true){sleep(1);int data = rand() % 100 + 1;bq->Push(data); //生产数据std::cout << "Producer: " << data << std::endl;}
}
void* Consumer(void* arg)
{BlockQueue<int>* bq = (BlockQueue<int>*)arg;//消费者不断进行消费while (true){int data = 0;bq->Pop(data); //消费数据std::cout << "Consumer: " << data << std::endl;}
}

虽然消费者消费的很快,但一开始阻塞队列中是没有数据的,因此消费者只能在empty条件变量下进行等待,直到生产者生产完一个数据后,消费者才会被唤醒进而进行消费,消费者消费完这一个数据后又会进行等待,因此生产者和消费者的步调就是一致的。

满足某一条件时再唤醒对应的生产者或消费者

我们也可以当阻塞队列当中存储的数据大于队列容量的一半时,再唤醒消费者线程进行消费;当阻塞队列当中存储的数据小于队列容器的一半时,再唤醒生产者线程进行生产。

//向阻塞队列插入数据(生产者调用)
void Push(const T& data)
{pthread_mutex_lock(&_mutex);while (IsFull()){//不能进行生产,直到阻塞队列可以容纳新的数据pthread_cond_wait(&_full, &_mutex);}_q.push(data);if (_q.size() >= _cap / 2){pthread_cond_signal(&_empty); //唤醒在empty条件变量下等待的消费者线程}pthread_mutex_unlock(&_mutex);
}
//从阻塞队列获取数据(消费者调用)
void Pop(T& data)
{pthread_mutex_lock(&_mutex);while (IsEmpty()){//不能进行消费,直到阻塞队列有新的数据pthread_cond_wait(&_empty, &_mutex);}data = _q.front();_q.pop();if (_q.size() <= _cap / 2){pthread_cond_signal(&_full); //唤醒在full条件变量下等待的生产者线程}pthread_mutex_unlock(&_mutex);
}

我们仍然让生产者生产的快,消费者消费的慢。运行代码后生产者还是一瞬间将阻塞队列打满后进行等待,但此时不是消费者消费一个数据就唤醒生产者线程,而是当阻塞队列当中的数据小于队列容器的一半时,才会唤醒生产者线程进行生产。

基于计算任务的生产者消费者模型

当然,实际使用生产者消费者模型时可不是简单的让生产者生产一个数字让消费者进行打印而已,我们这样做只是为了测试代码的正确性。
由于我们将BlockingQueue当中存储的数据进行了模板化,此时就可以让BlockingQueue当中存储其他类型的数据。

例如,我们想要实现一个基于计算任务的生产者消费者模型,此时我们只需要定义一个Task类,这个类当中需要包含一个Run成员函数,该函数代表着我们想让消费者如何处理拿到的数据。

#pragma once
#include <iostream>class Task
{public:Task(int x = 0, int y = 0, int op = 0): _x(x), _y(y), _op(op){}~Task(){}void Run(){int result = 0;switch (_op){case '+':result = _x + _y;break;case '-':result = _x - _y;break;case '*':result = _x * _y;break;case '/':if (_y == 0){std::cout << "Warning: div zero!" << std::endl;result = -1;}else{result = _x / _y;}break;case '%':if (_y == 0){std::cout << "Warning: mod zero!" << std::endl;result = -1;}else{result = _x % _y;}break;default:std::cout << "error operation!" << std::endl;break;}std::cout << _x << _op << _y << "=" << result << std::endl;}
private:int _x;int _y;char _op;
};

此时生产者放入阻塞队列的数据就是一个Task对象,而消费者从阻塞队列拿到Task对象后,就可以用该对象调用Run成员函数进行数据处理。

void* Producer(void* arg)
{BlockQueue<Task>* bq = (BlockQueue<Task>*)arg;const char* arr = "+-*/%";//生产者不断进行生产while (true){int x = rand() % 100;int y = rand() % 100;char op = arr[rand() % 5];Task t(x, y, op);bq->Push(t); //生产数据std::cout << "producer task done" << std::endl;}
}
void* Consumer(void* arg)
{BlockQueue<Task>* bq = (BlockQueue<Task>*)arg;//消费者不断进行消费while (true){sleep(1);Task t;bq->Pop(t); //消费数据t.Run(); //处理数据}
}

运行代码,当阻塞队列被生产者打满后消费者被唤醒,此时消费者在消费数据时执行的就是计算任务,当阻塞队列当中的数据被消费到低于一定阈值后又会唤醒生产者进行生产。

也就是说,此后我们想让生产者消费者模型处理某一种任务时,就只需要提供对应的Task类,然后让该Task类提供一个对应的Run成员函数告诉我们应该如何处理这个任务即可。

Linux生产者消费者模型相关推荐

  1. c++面试经验 | 锐捷网络嵌入式委培班 (二)Linux 生产者消费者模型

    先看代码 #include<stdio.h> #include<stdlib.h> #include<unistd.h> #include<pthread.h ...

  2. 【生产者消费者模型】

    Linux生产者消费者模型 生产者消费者模型 生产者消费者模型的概念 生产者消费者模型的特点 生产者消费者模型优点 基于BlockingQueue的生产者消费者模型 基于阻塞队列的生产者消费者模型 模 ...

  3. linux知识(二)互斥量、信号量和生产者消费者模型

    linux知识(二)互斥量.信号量和生产者消费者模型 一.互斥量 产生原因 二.信号量 生产者消费者模型 一.互斥量 产生原因 使用多线程常常会碰到数据混乱的问题,那么使用互斥量,相当于"加 ...

  4. Linux 多线程编程(实现生产者消费者模型)

    Linux 多线程编程 线程分类 线程按照其调度者可以分为用户级线程和内核级线程两种. 内核级线程 在一个系统上实现线程模型的方式有好几种,因内核和用户空间提供的支持而有一定程度的级别差异.最简单的模 ...

  5. 【Linux入门】多线程(线程概念、生产者消费者模型、消息队列、线程池)万字解说

    目录 1️⃣线程概念 什么是线程 线程的优点 线程的缺点 线程异常 线程异常 Linux进程VS线程 2️⃣线程控制 创建线程 获取线程的id 线程终止 等待线程 线程分离 3️⃣线程互斥 进程线程间 ...

  6. Linux基于单链表环形队列的多线程生产者消费者模型

    生产者–消费者模型简述 对于生产者–消费者模型,相信我们都不陌生,因为生活中,我们无时无刻不在扮演生产者或消费者.但是对于Linux中的生产者–消费者模型,大家又了解了一个什么程度? 其实,说白了就是 ...

  7. [Linux]生产者消费者模型(基于BlockQueue的生产者消费者模型 | 基于环形队列的生产者消费者模型 | 信号量 )

    文章目录 生产者消费者模型 函数调用角度理解生产者消费者模型 生活角度理解生产者消费者模型 为什么要使用生产者消费者模型 生产者消费者模型优点 321原则 基于BlockingQueue的生产者消费者 ...

  8. 【Linux篇】第十六篇——生产者消费者模型

    生产者消费者模型 生产者消费者模型的概念 生产者消费者模型的特点 生产者消费者模型优点 基于BlockingQueue的生产消费者模型 基于阻塞队列的生产者消费者模型 模拟实现基于阻塞队列的生产消费模 ...

  9. Linux多线程——生产者消费者模型

    目录 一.生产者消费者模型 1.1 什么是生成者消费者模型 1.2 生产者消费者模型的优点 1.3 基于阻塞队列实现生产者消费者模型 1.4 POSIX信号量 1.4.1 信号量概念 1.4.2 P操 ...

最新文章

  1. 计划任务执行php文件,linux系统下添加计划任务执行php文件方法
  2. main.xml Design显示不是设计界面,而是view属性的解决办法
  3. PyTorch必备神器 | 唯快不破:基于Apex的混合精度加速
  4. 安卓SQLiteOpenHelper使用说明
  5. 通过rxjs的一个例子, 来学习SwitchMap的使用方法
  6. Linux-0.11内核学习-添加系统调用
  7. CImage 获取图片RGB 、图片高和宽;
  8. 图像/视频超分之降质过程
  9. SYS/BIOS与SRIO应用实例
  10. 做一行就要把一行的本质研究透
  11. 路由器刷OPENWRT固件的方法
  12. 视频: 老罗演讲问答集锦
  13. 录屏鼠标光标圆圈如何实现_录屏鼠标光标圆圈如何实现
  14. VT处理器常用额外指令集(VMX)
  15. 原创:iPad 2第一次开机与激活指南
  16. java 谷歌地图_如何使用java在浏览器上显示谷歌地图?
  17. PS用橡皮檫檫除图形与背景颜色一样的方法
  18. 企业运维之 CDN 内容分发网络
  19. MATLAB | 用cftool对excel的数据进行三维曲线拟合
  20. 2023云南大学应用统计硕士专业考研成功经验分享

热门文章

  1. 关于STL中的greaterT()和lessT()
  2. Raptor实践参考:求圆周长
  3. data analysis (summary next step)
  4. GNSS 导航电文的解读
  5. 基于JavaSDK调用FISCO BCOS 区块链
  6. 微信转发(分享)功能
  7. 产品经理招聘分析及常见面试问题
  8. Linux内核通知链分析【转】
  9. C++ 工程实践:避免使用虚函数作为库的接口
  10. Axure从一个页面向另一个页面传值