背景:笔者之前一直从事嵌入式音视频相关的开发工作,对于音视频的数据的处理,生产者消费者队列必不可少,而如何实现一个高效稳定的生产者消费者队列则十分重要,不过按照笔者从业的经验,所看到的现象,不容乐观,很多知名大厂在这种基础组件的开发能力上十分堪忧。

音视频数据处理的特点:

  • 音视频数据量大:音视频数据特别是视频数据,占据了计算机数据的很大一块,不信就看看每个人的硬盘里,去除电影,照片,mp3是不是很空荡荡的。
  • 实时性要求高:音视频的延时如果大于200ms,使用体验会十分糟糕。
  • 处理流程复杂:一帧数据从sensor捕获到最终网传输出或者lcd显示,都需要经过一系列的模块进行处理。特别是网络传输,一般要经过原始数据捕获,视频数据格式转换,数据编码压缩,数据封装打包,网络传输。

生产者消费者队列在视频数据处理的必要性:

视频数据的处理为什么需要生产者消费者队列?其实上面提到的音视频数据处理的特点就是答案。

  • 一般对视频数据处理的模块都是运行在多线程中,每一个处理模块运行在一个线程中(也是软件工程分模块的思想),相互之间通过生产者消费者队列进行数据的交互,之所以用线程模型,而没有用我一直推崇的进程模型是因为线程间的共享内存比较方便,而进程则要相对复杂的多。
  • 利用多线程/多进程的并行处理能力 ,如果采用单线程单进程的单线处理模式,一帧数据从采集到输出线性经过几个模块,时效性无法保证。
  • 缓存数据,保持平滑,通过队列缓存视频数据,可以有效的去除一些数据抖动,帮助音视频数据的平滑播放,同时这个数据的缓存又不易过多,否则加大了延时,损伤实时性,所以队列大小的设置是一个平衡的艺术。

常见的生产者消费者队列实现存在的问题:

不注重效率性能:

  • 对于buffer状态的检测采用loop轮询方式。
    loop轮询是任何有追求的程序员都要避免的处理方式,而笔者以自己经历经常看到以下类似的代码:
pthread_mutex_lock();
state = check_some_state();
if (state == xxx) {do_some_process();
} else {usleep(x);
}
pthread_mutex_unlock();

以上代码loop一个状态,如果状态成立做有效的处理,不成立则睡眠一定时间,之后再次调用该段代码进行下一次的状态检测,而这个睡眠时间是一个随机经验值,很有可能下次仍然是无效的检测,接着睡眠再loop,多余的loop是一种资源的浪费。

数据的传递采用copy方式:

数据的传递,采用copy的方式,一帧数据在一个完整的处理流程中经过n次copy(笔者见过一个系统中一帧数据copy了8次之多)

生产者消费者队列和业务代码混杂在一起,没有分离:

对于开发者,都希望用最简单的接口完成某个功能,而不关心内部实现,在这里就是,只需要生产者生产数据,消费者消费数据,而内部的处理(同步,数据的处理等)完全不关心,这样开发者就不需要去弄很多锁,降低了开发难度,也可以使代码组件化,模块化。

有经验的开发者应该感觉以上都是基础点,不会有人犯这样的错误,不过笔者以自己的经历肯定的说,以上两种问题在某世界级大厂的视频设备上随处可见。

让我悲哀的是,当我指出这些问题时,某些开发者完全无动于衷。而我更无力的是,现在的cpu,memory性能实在是高,在某些不太高端的嵌入式芯片上,优化过的数据并没有十分明显,在一个实际项目上,经过优化后cpu大概降低1%(原总系统cpu占用7%),有经验的开发者又会说原7%的cpu占用率说明这个芯片做这个系统浪费了,不过也没办法其实已经用了比较低端的芯片了。。。

以上的吐槽主要是想说明:由于cpu,memory性能的提升,让很多开发者感觉软件的优化意义不大了,而我是一个理想主义者,对于某些设计ugly的代码真的是零容忍啊。

如何优化:

以上说了这么多,那如何操作呢?

  • 提高效率,去除多余的loop轮询:
    采用线程的同步机制,当状态条件符合要求时,通过通知机制触发后续处理,这样去除了无效的loop轮询检测,linux系统下可以采用条件变量,信号量来实现,我更倾向于使用条件变量,因为它是linux系统原生支持的接口。
    提到条件变量,不得不提一个概念:同步互斥,这么一个基本的操作系统概念,我最喜欢作为面试第一题,不过能用一句话切中要害的说出之间区别和各自特点的人不是很多,答不出这题的,基本上就pass了。
  • 减少多余copy:
    采用预分配的方式,将buffer分为free和active两大类,每一类buffer又切成几个小buffer,然后通过指针将两类buffer下的小buffer链接成两个链表,使用者获取buffer通过free链表获取buffer,再将buffer put到active链表上,以上都是指针的操作,没有数据的copy,极大的减少了copy操作。(再次强调指针是个好东西)
  • 模块化组件化:
    将生产者消费者队列的处理部分完全剥离成一个独立的模块组件,对外只提供几个基本的接口,内部完成同步通知的处理。

一个简单的实现:

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <sys/ioctl.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <fcntl.h>
#include <pthread.h>
#include <signal.h>
#include <time.h>#include "sfifo.h"//#define CONFIG_COND_FREE 1
#define CONFIG_COND_ACTIVE 1#define MAX_SFIFO_NUM   32struct sfifo_des_s sfifo_des[MAX_SFIFO_NUM];
struct sfifo_des_s *my_sfifo_des;struct sfifo_s* sfifo_get_free_buf(struct sfifo_des_s *sfifo_des_p)
{static long empty_count = 0;struct sfifo_s *sfifo = NULL;pthread_mutex_lock(&(sfifo_des_p->free_list.lock_mutex));
#ifdef CONFIG_COND_FREEwhile (sfifo_des_p->free_list.head == NULL) {pthread_cond_wait(&(sfifo_des_p->free_list.cond), &(sfifo_des_p->free_list.lock_mutex));}
#elseif (sfifo_des_p->free_list.head == NULL) {if (empty_count++ % 120 == 0) {printf("free list empty\n");}goto EXIT;}
#endifsfifo = sfifo_des_p->free_list.head;sfifo_des_p->free_list.head = sfifo->next;EXIT:pthread_mutex_unlock(&(sfifo_des_p->free_list.lock_mutex));return sfifo;
}int sfifo_put_free_buf(struct sfifo_s *sfifo, struct sfifo_des_s *sfifo_des_p)
{int send_cond = 0;pthread_mutex_lock(&(sfifo_des_p->free_list.lock_mutex));if (sfifo_des_p->free_list.head == NULL) {sfifo_des_p->free_list.head = sfifo;sfifo_des_p->free_list.tail = sfifo;sfifo_des_p->free_list.tail->next = NULL;send_cond = 1;} else {sfifo_des_p->free_list.tail->next = sfifo;sfifo_des_p->free_list.tail = sfifo;sfifo_des_p->free_list.tail->next = NULL;}pthread_mutex_unlock(&(sfifo_des_p->free_list.lock_mutex));
#ifdef CONFIG_COND_FREEif (send_cond) {pthread_cond_signal(&(sfifo_des_p->free_list.cond));}
#endifreturn 0;
}struct sfifo_s* sfifo_get_active_buf(struct sfifo_des_s *sfifo_des_p)
{struct sfifo_s *sfifo = NULL;pthread_mutex_lock(&(sfifo_des_p->active_list.lock_mutex));
#ifdef CONFIG_COND_ACTIVEwhile (sfifo_des_p->active_list.head == NULL) {//pthread_cond_timedwait(&(sfifo_des_p->active_list.cond), &(sfifo_des_p->active_list.lock_mutex), &outtime);pthread_cond_wait(&(sfifo_des_p->active_list.cond), &(sfifo_des_p->active_list.lock_mutex));}
#elseif (sfifo_des_p->active_list.head == NULL) {printf("active list empty\n");goto EXIT;}
#endifsfifo = sfifo_des_p->active_list.head;sfifo_des_p->active_list.head = sfifo->next;EXIT:pthread_mutex_unlock(&(sfifo_des_p->active_list.lock_mutex));return sfifo;
}int sfifo_put_active_buf(struct sfifo_s *sfifo, struct sfifo_des_s *sfifo_des_p)
{int send_cond = 0;pthread_mutex_lock(&(sfifo_des_p->active_list.lock_mutex));if (sfifo_des_p->active_list.head == NULL) {sfifo_des_p->active_list.head = sfifo;sfifo_des_p->active_list.tail = sfifo;sfifo_des_p->active_list.tail->next = NULL;send_cond = 1;} else {sfifo_des_p->active_list.tail->next = sfifo;sfifo_des_p->active_list.tail = sfifo;sfifo_des_p->active_list.tail->next = NULL;}pthread_mutex_unlock(&(sfifo_des_p->active_list.lock_mutex));
#ifdef CONFIG_COND_ACTIVEif (send_cond) {pthread_cond_signal(&(sfifo_des_p->active_list.cond));}
#endifreturn 0;
}int dump_sfifo_list(struct sfifo_list_des_s *list)
{struct sfifo_s *sfifo = NULL;sfifo = list->head;do {printf("dump : %x\n", sfifo->buffer[0]);usleep(500 * 1000);} while (sfifo->next != NULL && (sfifo = sfifo->next));return 0;
}struct sfifo_des_s *sfifo_init(int sfifo_num, int sfifo_buffer_size, int sfifo_active_max_num)
{int i = 0;struct sfifo_s *sfifo;struct sfifo_des_s *sfifo_des_p;sfifo_des_p = (struct sfifo_des_s *)malloc(sizeof(struct sfifo_des_s));sfifo_des_p->sfifos_num = sfifo_num;sfifo_des_p->sfifos_active_max_num = sfifo_active_max_num;sfifo_des_p->free_list.sfifo_num = 0;sfifo_des_p->free_list.head = NULL;sfifo_des_p->free_list.tail = NULL;pthread_mutex_init(&sfifo_des_p->free_list.lock_mutex, NULL);pthread_cond_init(&sfifo_des_p->free_list.cond, NULL);sfifo_des_p->active_list.sfifo_num = 0;sfifo_des_p->active_list.head = NULL;sfifo_des_p->active_list.tail = NULL;pthread_mutex_init(&sfifo_des_p->active_list.lock_mutex, NULL);pthread_cond_init(&sfifo_des_p->active_list.cond, NULL);for (i = 0; i < sfifo_num; i++) {sfifo = (struct sfifo_s *)malloc(sizeof(struct sfifo_s));sfifo->buffer = (unsigned char *)malloc(sfifo_buffer_size);printf("sfifo_init: %x\n", sfifo->buffer);memset(sfifo->buffer, i, sfifo_buffer_size);sfifo->size = sfifo_buffer_size;sfifo->next = NULL;sfifo_put_free_buf(sfifo, sfifo_des_p);}return sfifo_des_p;
}void *productor_thread_func(void *arg)
{struct sfifo_s *sfifo;while (1) {sfifo = sfifo_get_free_buf(my_sfifo_des);if (sfifo != NULL) {printf("+++++++++++++++++ put : %x\n", sfifo->buffer[0]);sfifo_put_active_buf(sfifo, my_sfifo_des);}//usleep(20*1000);}
}void *comsumer_thread_func(void *arg)
{struct sfifo_s *sfifo;int count = 0;while (1) {sfifo = sfifo_get_active_buf(my_sfifo_des);if (sfifo != NULL) {printf("---------------- get %x\n", sfifo->buffer[0]);sfifo_put_free_buf(sfifo, my_sfifo_des);}//usleep(10 * 1000);// if (count++ > 10000) {//  exit(-1);// }}
}int main()
{int ret;static pthread_t productor_thread;static pthread_t consumer_thread;struct sfifo_s *r_sfifo;my_sfifo_des = sfifo_init(10, 4096, 5);ret = pthread_create(&productor_thread, NULL, productor_thread_func, NULL);ret = pthread_create(&consumer_thread, NULL, comsumer_thread_func, NULL);while (1) {sleep(1);}return 0;
}

以上是一个简单的生产者消费者队列的c语言的实现,对应的头文件在本文底部(贴代码太长看起来很崩溃)。

仔细的同学可能会发现,以上代码sfifo_get_free_buf()中默认是loop轮询检测free buffer链表的,你前面不是说了一大堆不能loop吗?怎么还用loop呢?
这里其实有两个原因:

  • 生产者的loop是可以接受的,当发生多余loop,无法命中时,说明生产者太快,消费者太慢,而其实对于一个生产者消费者模型出现以上问题时,说明整个业务流程要重新考虑,因为正常的情况是消费者总是要快于生产者,这个业务模型才能正常的运行下去。
  • 对于有些业务模型,生产者业务模块部分是不能阻塞的,也就是说,如果free list没有数据,我们采用pthread_cond_wait()阻塞后,会导致生产者出现问题,这样最好的处理方式就是生产者模块接口返回出错,生产者业务方丢弃数据(此时就是丢帧了,这种情况如果频繁发生是不能接受了,不过也说明了消费者要有足够的能力处理生产者生产出的数据,否则整个业务都是有问题)

这个实现有那些优势:

走读和运行以上代码的同学应该可以发现这里做了一个简单可运行的demo模拟了生产者和消费者双方:

void *productor_thread_func(void *arg)
{struct sfifo_s *sfifo;while (1) {sfifo = sfifo_get_free_buf(my_sfifo_des);if (sfifo != NULL) {printf("+++++++++++++++++ put : %x\n", sfifo->buffer[0]);sfifo_put_active_buf(sfifo, my_sfifo_des);}//usleep(20*1000);}
}void *comsumer_thread_func(void *arg)
{struct sfifo_s *sfifo;int count = 0;while (1) {sfifo = sfifo_get_active_buf(my_sfifo_des);if (sfifo != NULL) {printf("---------------- get %x\n", sfifo->buffer[0]);sfifo_put_free_buf(sfifo, my_sfifo_des);}//usleep(10 * 1000);// if (count++ > 10000) {//  exit(-1);// }}
}

这里面对于使用者的优点有:

  1. 接口简单:只需要get free,put active;get active,put free。
  2. 没有了数据copy,只需要操作链表上的buffer就可以了,而这些buffer的参数控制通过init接口设置。
  3. 不用再控制sleep的时间值:前面提到,在loop模型下,如果状态不成立需要sleep一段时间,再次检查,这样来控制同步状态,而这个时间值很难确定,如果时间值过长,则会导致状态检测不及时,延误数据处理,如果时间值太短,则会增加状态检测miss cache的次数,耗费更多cpu资源。而采用本模块的实现则完全不需要考虑这些问题,只需要衔接业务处理,sleep,同步,yield cpu的操作都由这个模块实现吧,完全不需要关心。
  4. 模块化,完全和业务处理无关,可以毫无压力的运用在不同的业务处理逻辑中,没有剥离代码的工作。

以上描述了一个生产者消费者队列c语言的实现,为什么是c语言版本的?因为其他高级语言,有很多成熟的库提供了该功能,完全不用自己写,而c就没这么完善了,不过这也说明了c的简单灵活。但悲哀的是很多人因此进行了很ugly的实现。
多吐槽几句,嵌入式行业由于各种技术原因,导致开发语言还是采用c,这样对开发人员有了不小的要求,而如何才能写一些优雅的代码,对人的素质有了要求,但现状是优秀的开发者都被互联网行业抢走了,导致嵌入式行业开发人员的水平参差不齐,本来应该是一个对编码能力要求很高的行业被一些水平低下的开发者占据。so,我离开了这个行业了。。。

附:模块头文件,类linux用户可通过gcc xxx.c命令build该demo,然后运行测试。

#ifndef SFIFO_H_
#define SFIFO_H_struct sfifo_list_des_s {int sfifo_num;struct sfifo_s *head;struct sfifo_s *tail;pthread_mutex_t lock_mutex;pthread_cond_t cond;
};struct sfifo_des_s {int sfifo_init;unsigned int sfifos_num;unsigned int sfifos_active_max_num;struct sfifo_list_des_s free_list;struct sfifo_list_des_s active_list;
};struct sfifo_s {unsigned char *buffer;unsigned int size;struct sfifo_s *next;
};extern struct sfifo_des_s *sfifo_init(int sfifo_num, int sfifo_buffer_size, int sfifo_active_max_num);/* productor */
extern struct sfifo_s* sfifo_get_free_buf(struct sfifo_des_s *sfifo_des_p);
extern int sfifo_put_free_buf(struct sfifo_s *sfifo, struct sfifo_des_s *sfifo_des_p);/* consumer */
extern struct sfifo_s* sfifo_get_active_buf(struct sfifo_des_s *sfifo_des_p);
extern int sfifo_put_active_buf(struct sfifo_s *sfifo, struct sfifo_des_s *sfifo_des_p);#endif

实现一个通用的生产者消费者队列(c语言版本)相关推荐

  1. deque实现生产者-消费者队列

    deque实现生产者-消费者队列 1.概述 使用python内置的list类型可以实现一个生产-消费队列功能,这个队列是先进先出.把批量的数据放到生产队列可以加快程序处理业务的速度,然后消费者可以从消 ...

  2. C++实现生产者消费者队列

    C++实现生产者消费者队列 分析 程序 队列的类 生产者逻辑 消费者逻辑 主函数 结果分析 源码地址 分析 首先,我们的生产者与消费者队列需要满足同步与互斥关系,就需要一把互斥锁,以及生产者与消费者各 ...

  3. 生产者消费者模型之lock版本(JAVA)

    生产者消费者模型之lock版本 package com.company.run;import java.util.LinkedList; import java.util.concurrent.loc ...

  4. 一个简单的生产者消费者案例

    生产者消费者模式 为什么要引入生产者消费者模式 简单来说就是为了解决多线程下程序先后执行问题 遇到的例:实际生产中出现的场景 服务D依赖于服务A.B.C产生的数据,服务D必须等待A.B.C产生结果并持 ...

  5. java+创建metaq生产者_微服务架构【技术点4】windows下rocketMQ配置及java端生产者消费者配置-Go语言中文社区...

    参考资料 windows下配置rocketMQ 解压缩 系统环境变量配置 变量名:ROCKETMQ_HOME 变量值:MQ解压路径MQ文件夹名 启动NAMESERVER Cmd命令框执行进入至'MQ文 ...

  6. 生产者消费者模型——C语言代码详解

    概念 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题.生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者 ...

  7. windows进程生产者消费者代码c语言,生产者消费者问题---C语言实现

    生产者消费者问题(Producer-consumer problem) 是一个多线程同步问题的经典案例. 生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程.与此同时,消费者也在缓冲区消耗 ...

  8. 生产者消费者代码c语言_由生产者消费者模型引出的线程同步问题

    由生产者消费者模型引出的线程同步问题 基本生产者消费者模型: 代码示例: 数据模型: /*** Created by IntelliJ IDEA.** @Author: ZhangDong* @Dat ...

  9. java 消费者 生产者 队列_用Java写一个生产者-消费者队列

    packageyunche.test.producer;importjava.util.LinkedList;importjava.util.Random;/*** @ClassName: Produ ...

最新文章

  1. JasperReport报表设计4
  2. php array 关联数组,php array_merge关联数组
  3. linux-security-limits
  4. combobox 怎么实现对listview的类别查询_Flutter实战之独立实现官网Demo单词收藏Demo
  5. SqlCommandBuilder
  6. [python爬虫] BeautifulSoup和Selenium对比爬取豆瓣Top250电影信息
  7. ffmpeg-win32-v3.2.4 下载_iTOP-4412开发板android4.4代码下载和编译
  8. 终止线程的三种方法(转)
  9. Dubbo源码分析系列之-整体架构设计
  10. select非group by字段的方法
  11. html 弹窗 支持ie8,浏览器兼容性的问题、支持IE8、不支持IE6、想解决这个问题、两个都支持、...
  12. python怎么读发音百度翻译-python 百度翻译破解版,亲证可行
  13. CENTOS上编译FreeSwitch
  14. Hibernate 主键
  15. JAVA虚拟机规范第八版与JAVA虚拟机规范第九版PDF资源分享
  16. 百度前员工因内网发布“女优一览表”被辞退,自诉原因:想转岗鉴黄师.........
  17. MatlabR2012a 显示使用过期的注册文件破解(.lic)
  18. \t\t无锡联通宽带最新覆盖小区名单?
  19. 拼写单词(leetcode 1160)
  20. Google Play国内应用市场发布版本步骤指导

热门文章

  1. 《Java8实战》笔记(16):结论以及Java的未来
  2. C++:MAC安装Boost库文件并且使用CLion开发
  3. 一个简单JavaAgent的实现
  4. ffmpeg的内部Video Buffer管理和传送机制
  5. IT巨头互掐云存储:Dropbox能否一马当先
  6. SQL零基础学习笔记(一)
  7. 好记性不如烂笔头,记录几个常用的Linux操作
  8. AVS 分像素运动估计优化算法
  9. 工具类:获取 spring 容器中 bean
  10. 解决: idea 修改 jsp 后,页面刷新无效