Linux多进程实现生产者消费者问题
1. 任务简介
生产者消费者问题(Producer-consumer problem),也称有限缓冲问题(Bounded-buffer problem),是一个著名的进程同步问题的经典案例。它描述的是有一组生产者进程在生产产品,并将这些产品提供给一组消费者进程去消费。为使生产者进程和消费者进程能够并发执行,在这两者之间设置里一个具有nnn个缓冲区的缓冲池,生产者进程将他所生产的的产品放入一个缓冲区中;消费者进程可从一个缓冲区中取走产品并进行消费。尽管所有的生产者进程和消费者进程都是以异步方式运行的,但亡们之间必须保特同步,即不允许消费者进程到一个空缓冲区中去取产品;也不允许生产者进程向一个已装满产品且产品尚未被取走的缓冲区投放产品。
本项目要求利用Linux多进程实现生产者消费者问题。
2. 思路分析
我们分析题目中的同步和互斥关系:
2.1 同步关系
- 当缓冲区有空位时,生产者进程才可以生产
- 当缓冲区有产品是,消费者进程才可以消费
2.2 互斥关系
- 生产者进程与消费者进程对缓冲区的访问是互斥的
2.3 整体思路
总体思路如下:
- 设置一个生产者进程,负责生产产品
- 设置一个消费这进程,负责消费产品
- 生产者与消费者进程间的通讯通过共享内存实现
- 设置一个互斥信号量,实现对共享内存的互斥访问
- 设置两个信号量,用于标记资源的数目,实现进程间的两个同步关系
具体流程如下图所示:
3. 代码实现
3.1 头文件
首先,我们包含实现问题所需的头文件:
#include <time.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include <sys/ipc.h>
3.2 预定义和数据结构
- 我们采用共同协商关键字
SEMKEY
,SHMKEY
的方法使得不同进程间可以取得同一个信号量和共享内存 - 定义了一个结构体
Buffer
来作为缓冲池存储产品
#define SEMKEY 123
#define SHMKEY 456
#define BUFNUM 10
#define SEMNUM 3#if defined(__GNU_LIBRARY__) && !defined(_SEM_SEMUN_UNDEFINED)
/* union semun is defined by including <sys/sem.h> */
#else
/* according to X/OPEN we have to define it ourselves */
union semun
{int val;struct semid_ds *buf;unsigned short *array;
};
#endifstruct Buffer
{int start, end;char buffer[BUFNUM];
};
3.3 初始化函数
- 我们利用协商好的
SEMKEY
生成一个信号量集,其中第一个信号量为empty
,表示缓冲池为空的个数;第二个信号量为full
,表示缓冲池中的产品个数;第三个信号量为mutex
,控制对缓冲池的读取权限。 - 利用协商好的
SHMKEY
生成一个共享内存集 - 我们利用
*returnSemId
,*returnShmId
和**returnShm
三个指针来返回初始化的参数
具体实现如下:
void Initialize(int *returnSemId, int *returnShmId, struct Buffer **returnShm)
{int semId = -1, shmId = -1, values[SEMNUM] = {BUFNUM, 0, 1};/* semSet[0]: empty, initial value: nsemSet[1]: full, initial value 0semSet[2]: mutex, initial value 1 */semId = semget(SEMKEY, SEMNUM, IPC_CREAT | 0666);if(semId == -1){printf("semaphore creation failed!\n");exit(EXIT_FAILURE);}int i = 0;union semun semUn;for( i = 0; i < SEMNUM; i ++){semUn.val = values[i];if(semctl(semId, i, SETVAL, semUn) < 0){printf("semaphore %d initialization failed!\n", i);exit(EXIT_FAILURE);}}shmId = shmget(SHMKEY, sizeof(struct Buffer), IPC_CREAT | 0666);if(shmId == -1){printf("share memory creation failed!\n");exit(EXIT_FAILURE);}void *temp = NULL;struct Buffer *shm = NULL;temp = shmat(shmId, 0, 0);if(temp == (void *) -1){printf("share memory attachment failed!\n");exit(EXIT_FAILURE); }shm = (struct Buffer *) temp;shm -> start = 0;shm -> end = 0;for(i = 0; i < BUFNUM; i++){shm -> buffer[i] = ' ';}*returnSemId = semId;*returnShmId = shmId;*returnShm = shm;
}
3.4 PV操作
给定信号量集的semId
以及待操作的信号量下标semNum
,其P
操作和V
如下所示:
void SemWait(int semId, int semNum)
{struct sembuf semBuf;semBuf.sem_num = semNum;semBuf.sem_op = -1;semBuf.sem_flg = SEM_UNDO;if(semop(semId, &semBuf, 1) == -1){printf("semaphore P operation failed!\n");exit(EXIT_FAILURE);}
}void SemSignal(int semId, int semNum)
{struct sembuf semBuf;semBuf.sem_num = semNum;semBuf.sem_op = 1;semBuf.sem_flg = SEM_UNDO;if(semop(semId, &semBuf, 1) == -1){printf("semaphore V operation failed!\n");exit(EXIT_FAILURE);}
}
3.5 生产者进程
生产者首先申请一个空闲缓冲区资源,再申请临界缓冲区访问。当产生一个产品后,发送一个信号,使得已有缓冲区资源数量加一,同时唤醒阻塞的消费者进程,具体代码如下:
void Producer(int semId, struct Buffer *shm)
{do{// wait empty regionSemWait(semId, 0);// wait mutexSemWait(semId, 2);Add(shm);// signal mutexSemSignal(semId, 2);// singal full regionSemSignal(semId, 1);sleep(random() % 2);}while(1);
}
执行Add
操作时,随机产生一个大写英文字母模拟产品,放入缓冲区,同时调整队尾指针end
,具体代码如下:
void Add(struct Buffer *shm)
{char product = 'A' + rand() % 26;printf("producer %d: added product %c into buffer:\t", getpid(), product);shm -> buffer [shm -> end] = product;shm -> end = (shm -> end + 1) % BUFNUM;printf("|%s|\n", shm -> buffer);
}
3.6 消费者进程
消费者首先申请一个已有缓冲区资源,再申请临界缓冲区访问。当消费一个产品后,发送一个信号,使得空闲缓冲区资源数量加一,同时唤醒阻塞的生产者进程,具体代码如下:
void Producer(int semId, struct Buffer *shm)
{do{// wait empty regionSemWait(semId, 0);// wait mutexSemWait(semId, 2);Add(shm);// signal mutexSemSignal(semId, 2);// singal full regionSemSignal(semId, 1);sleep(random() % 2);}while(1);
}
执行Remove
操作时,将当前缓冲区资源清空,同时调整队首指针start
,具体代码如下:
void Remove(struct Buffer *shm)
{char product = shm -> buffer [shm -> start];printf("consumer %d: removed product %c from buffer:\t", getpid(), product);shm -> buffer [shm -> start] = ' ';shm -> start = (shm -> start + 1) % BUFNUM;printf("|%s|\n", shm -> buffer);
}
3.7 主函数
从控制台通过-n
命令读入产生生产者和消费者进程的数目,首先初始化变量,之后通过fork()
产生等量的生产者和消费者进程。
注意:此处主进程在产生完其他子进程之后不能够直接退出,否则子进程会修改父进程为
systemd
,试的我们无法通过控制台的ctrl + c
命令结束程序。
int main(int argc, char *argv[])
{int semId = -1, shmId = -1, i=0;int processNum = atoi(argv[2]);if(processNum <= 0) processNum = 1;struct Buffer *shm = NULL;Initialize(&semId, &shmId, &shm);for(i = 0; i < 2 * processNum; i ++){pid_t pid = fork();if(pid < 0){printf("fork failed!\n");exit(EXIT_FAILURE);}else if(pid == 0){sleep(1);if(i % 2 == 0){printf("producer process %d created\n", getpid());Producer(semId, shm); }else{printf("consumer process %d created\n", getpid());Consumer(semId, shm);}return 0;}}getchar();Destroy(semId, shmId, shm);return 0;
}
3.8 实验代码
完整实验代码如下:
#include <time.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include <sys/ipc.h>#define SEMKEY 123
#define SHMKEY 456
#define BUFNUM 10
#define SEMNUM 3#if defined(__GNU_LIBRARY__) && !defined(_SEM_SEMUN_UNDEFINED)
/* union semun is defined by including <sys/sem.h> */
#else
/* according to X/OPEN we have to define it ourselves */
union semun
{int val;struct semid_ds *buf;unsigned short *array;
};
#endifstruct Buffer
{int start, end;char buffer[BUFNUM];
};void Initialize(int *returnSemId, int *returnShmId, struct Buffer **returnShm)
{int semId = -1, shmId = -1, values[SEMNUM] = {BUFNUM, 0, 1};/* semSet[0]: empty, initial value: nsemSet[1]: full, initial value 0semSet[2]: mutex, initial value 1 */semId = semget(SEMKEY, SEMNUM, IPC_CREAT | 0666);if(semId == -1){printf("semaphore creation failed!\n");exit(EXIT_FAILURE);}int i = 0;union semun semUn;for(i = 0; i < SEMNUM; i ++){semUn.val = values[i];if(semctl(semId, i, SETVAL, semUn) < 0){printf("semaphore %d initialization failed!\n", i);exit(EXIT_FAILURE);}}shmId = shmget(SHMKEY, sizeof(struct Buffer), IPC_CREAT | 0666);if(shmId == -1){printf("share memory creation failed!\n");exit(EXIT_FAILURE);}void *temp = NULL;struct Buffer *shm = NULL;temp = shmat(shmId, 0, 0);if(temp == (void *) -1){printf("share memory attachment failed!\n");exit(EXIT_FAILURE); }shm = (struct Buffer *) temp;shm -> start = 0;shm -> end = 0;for(i = 0; i < BUFNUM; i++){shm -> buffer[i] = ' ';}*returnSemId = semId;*returnShmId = shmId;*returnShm = shm;
}void Add(struct Buffer *shm)
{char product = 'A' + rand() % 26;printf("producer %d: added product %c into buffer:\t", getpid(), product);shm -> buffer [shm -> end] = product;shm -> end = (shm -> end + 1) % BUFNUM;printf("|%s|\n", shm -> buffer);
}void Remove(struct Buffer *shm)
{char product = shm -> buffer [shm -> start];printf("consumer %d: removed product %c from buffer:\t", getpid(), product);shm -> buffer [shm -> start] = ' ';shm -> start = (shm -> start + 1) % BUFNUM;printf("|%s|\n", shm -> buffer);
}void ShmDestroy(int semId, struct Buffer * shm)
{if(shmdt(shm) < 0){printf("share memory detachment failed!\n");exit(EXIT_FAILURE);} if(shmctl(semId, IPC_RMID, 0) < 0){printf("share memory destruction failed!\n");exit(EXIT_FAILURE); }
}void SemWait(int semId, int semNum)
{struct sembuf semBuf;semBuf.sem_num = semNum;semBuf.sem_op = -1;semBuf.sem_flg = SEM_UNDO;if(semop(semId, &semBuf, 1) == -1){printf("semaphore P operation failed!\n");exit(EXIT_FAILURE);}
}void SemSignal(int semId, int semNum)
{struct sembuf semBuf;semBuf.sem_num = semNum;semBuf.sem_op = 1;semBuf.sem_flg = SEM_UNDO;if(semop(semId, &semBuf, 1) == -1){printf("semaphore V operation failed!\n");exit(EXIT_FAILURE);}
}void SemDestroy(int semId)
{union semun semUn;if(semctl(semId, 0, IPC_RMID, semUn) < 0){printf("semaphore destruction failed!\n");exit(EXIT_FAILURE);}
}void Destroy(int semId, int shmId, struct Buffer *shm)
{SemDestroy(semId);ShmDestroy(shmId, shm);printf("destruction finished! exit\n");
}void Producer(int semId, struct Buffer *shm)
{do{// wait empty regionSemWait(semId, 0);// wait mutexSemWait(semId, 2);Add(shm);// signal mutexSemSignal(semId, 2);// singal full regionSemSignal(semId, 1);sleep(random() % 2);}while(1);
}void Consumer(int semId, struct Buffer *shm)
{do{// wait full regionSemWait(semId, 1);// wait mutexSemWait(semId, 2);Remove(shm);// signal mutexSemSignal(semId, 2);// singal empty regionSemSignal(semId, 0);sleep(random() % 2);}while(1);
}int main(int argc, char *argv[])
{int semId = -1, shmId = -1, i=0;int processNum = atoi(argv[2]);if(processNum <= 0) processNum = 1;struct Buffer *shm = NULL;Initialize(&semId, &shmId, &shm);for(i = 0; i < 2 * processNum; i ++){pid_t pid = fork();if(pid < 0){printf("fork failed!\n");exit(EXIT_FAILURE);}else if(pid == 0){sleep(1);if(i % 2 == 0){printf("producer process %d created\n", getpid());Producer(semId, shm); }else{printf("consumer process %d created\n", getpid());Consumer(semId, shm);}return 0;}}getchar();Destroy(semId, shmId, shm);return 0;
}
4. 实验结果
我们通过gcc
编译器编译源程序producer_consumer.c
,生成目标文件producer_consumer
4.1 单个生产者消费者
我们从控制台输入命令$ ./producer_consumer -n 1
,来模拟一个生产者和一个消费者的情况:
我们可以清楚的看到一个生产者一个消费者进程存在时的状况。
4.2 多个生产者消费者
同理,我们从控制台输入命令$ ./producer_consumer -n 5
,来模拟多个生产者和多个消费者的情况,实验结果节选片段如下:
我们可以很轻易的通过上图所示的缓冲池可视化结果,验证我们程序的正确性,至此实验部分介绍完毕。
Linux多进程实现生产者消费者问题相关推荐
- 多进程实现生产者消费者
1 # 多进程实现生产者消费者模型 2 import multiprocessing 3 import random 4 import time 5 6 7 class Producer(multip ...
- linux多线程 消费者,linux c 多线程 生产者-消费者二
linux c 多线程 生产者--消费者2 实在不好意思,第一个版本有些问题,是局部变量和堆里面变量的区别.今天做了一下修改.代码如下. #ifndef _LIST_H_ #define _LIST_ ...
- linux 生产者消费者 多进程,Linux多线程,生产者消费者算法和条件变量的使用
接着上一篇博文,原来双线程,现在为了实现 暂停/继续 功能,又加了一个线程.第三线程使用条件信号量,当用户按下S键,第三线程将检测到,并且将ifpause置为1,然后输出线程将在if语句成立后被条件信 ...
- 在Linux系统下生产者消费者,Linux线程编程之生产者消费者问题
前言 本文基于顺序循环队列,给出Linux生产者/消费者问题的多线程示例,并讨论编程时需要注意的事项.文中涉及的代码运行环境如下: 本文假定读者已具备线程同步的基础知识. 一 顺序表循环队列 1.1 ...
- Linux多线程实现生产者消费者进程(Linux+window代码)
原文链接:我的个人链接 Linux 常用多线程函数 pthread_create():创建一个线程 pthread_exit():退出一个线程 pthread_jion():阻塞当前线程,直到另一个线 ...
- Linux C 实现生产者消费者问题
//信号量---线程间通信 //"生产者消费者" 问题 #include<stdio.h> #include<stdlib.h> #include<u ...
- 在Linux系统下生产者消费者,生产者-消费者问题实现 (linux下C语言)
操作系统的一个经典问题是"生产者-消费者"问题, 这涉及同步信号量和互斥信号量的应用, 在这里,我用线程的同步和互斥来实现. /* * author 张文 * 2008/06/20 ...
- Linux 10:生产者消费者问题
文章目录 1. 生产者消费者 1.1 生产者消费者问题概述 1.2 生产者消费者问题优点 1.3 生产者消费者问题图解 1. 生产者消费者 1.1 生产者消费者问题概述 生产者/消费者问题,也被称 ...
- Linux进程互斥——生产者-消费者
经典的进程同步问题--生产者-消费者 模拟生产者-消费者的示例程序 示例程序代码 运行结果 改造程序,取消所有的同步机制,记录执行情况并进行分析 代码如下 运行结果 模拟生产者-消费者的示例程序 本示 ...
最新文章
- centos6.4安装java,CentOS6.4下YUM安装MySQL和JDK和Tomcat
- jquery学习之重要知识点
- 知乎改版api接口之scrapy自动登陆
- 理论基础 —— 排序 —— 直接选择排序
- CNN与MLP之间的关系,优缺点
- 马斯克开始行动:下调Twitter Blue订阅费 禁止广告
- scrapy框架爬取知乎用户
- unit 12 文档练习
- 硬盘容量统计显示WinDirStat v1.1.2.79(印心绿化版)
- 7.从Paxos到Zookeeper分布式一致性原理与实践---Zookeeper 技术内幕
- BZOJ2395 [Balkan 2011]Timeismoney 【最小乘积生成树】
- 对目录下所有库文件进行rpath更改操作的SHELL脚本
- (美)梅耶(Myers, G. J.) 等《软件测试的艺术(原书第3版)》书籍(第3版)
- Android基础入门
- 成功解决NavigationDuplicated: Avoided redundant navigation to current location:
- FDTD_谐振腔的Q值计算(2D/3D)
- workman+thinkPHP 即时通讯
- 资深电竞发烧友走心盘点,五款高续航游戏低延迟蓝牙耳机分享
- G003-186-07
- 扁平和树形结构的几种互转