Linux多进程编程(2)
简介
IPC(Inter Process Communication,进程间通信)的方式总共有三种,分别是信号量、共享内存和消息队列,本文介绍前两种。
在Linux中,进程之间操作公共数据时,需要进行互斥操作,这种情况需要临界区。在《Linux高性能服务器编程》这本书中,给出了semget
、semop
和semctl
三个函数进行有关的操作。但是,Linux官方不建议使用这三个旧的函数了,而是建议使用新的API;上述三个函数是针对有名字类型的信号量进行操作,有些情况下,还需要对公共内存区进行操作,此时需要无名类型的信号量,同样的,使用官方建议的新版的API。
进程的信号量机制
PV原语操作
信号量机制和PV原语操作,是进程互斥的基础。PV原语操作是指任意时刻任何进程或者线程只能通过PV原语对信号量进行操作,操作是原子的,不能有其他进程中断当前进程的操作。
假设有信号量S,V操作是S = S + 1
, P操作是S = S - 1
;如果S = 0
,V操作阻塞,等待其他进程唤醒。最典型的应用是生产者消费者模型,生产者进行P操作,消费者V操作。
PV原语对应Linux的系统调用
在Linux中,sem_t
类型表示一个信号量,是一个特有的数据类型。
V操作对应的函数
#include <semaphore.h>
int sem_post(sem_t* sem);
需要添加-pthread
连接。sem
是信号量值,该函数使得sem
加1,如果加1后*sem > 0
而且有进程阻塞,那么会唤醒阻塞的进程。成功返回0,失败返回-1。
P操作对应的函数
#include <semaphore.h>
int sem_wait(sem_t* sem);
如果sem指向的信号量大于0,那么*sem -= 1
,而且会立刻返回,如果当前*sem == 0
,那么会阻塞,有可能使得信号量-1。
有名信号量和无名信号量区别与联系
两者都是信号量机制的,用于消息传递和互斥。
无名信号量只能存在于内存中,要求使用信号量的进程必须能访问信号量所在的这一块内存,所以无名信号量只能应用在同一进程内的线程之间(共享进程的内存),或者不同进程中已经映射相同内存内容到它们的地址空间中的线程(即信号量所在内存被通信的进程共享)。意思是说无名信号量只能通过共享内存访问。
有名信号量可以通过名字访问,因此可以被任何知道它们名字的进程中的线程使用。
单个进程中使用 POSIX 信号量时,无名信号量更简单。多个进程间使用 POSIX 信号量时,有名信号量更简单。
作者:ACool
链接:https://juejin.im/post/5b2908d2e51d4558e3600827
来源:掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
有名信号量实例
关于创建多个子进程,参考这篇博客:https://blog.csdn.net/qq_35976351/article/details/86584644
创建函数:
#include <fcntl.h> /* For O_* constants */
#include <sys/stat.h> /* For mode constants */
#include <semaphore.h>sem_t *sem_open(const char *name, int oflag);
sem_t *sem_open(const char *name, int oflag,mode_t mode, unsigned int value);
name
是信号量的名称,如果oflag
是O_CREAT
,而且是第一次创建,则必须使用第二个函数,mode
表示去权限,value
是信号量的初始值;如果同样的oflag
,已经存在,则使用第一个。
代码实例,父子进程使用信号量同步,父进程不使用wait
函数。
#include <stdio.h>
#include <unistd.h>
#include <semaphore.h>
#include <fcntl.h>
#include <sys/stat.h>sem_t* sem;
const char* sem_name = "my_sem";int main() {sem = sem_open (sem_name, O_CREAT, 0660, 0);int pid = fork();if (pid < 0) {perror ("fork() error\n");return 1;} else if (pid == 0) { // 子进程sleep (3); // 续3秒puts ("child");sem_post (sem);} else { // 父进程sem_wait (sem);puts ("parent");sem_destroy(sem);}return 0;
}
无名信号量实例
无名信号的创建函数:
#include <semaphore.h>
int sem_init(sem_t* sem, int pshared, unsigned int value);
sem
是信号量地址。pshared
是共享的标志,如果为0,表示一个进程内各个线程之间共享;不为零表示进程之间的共享。成功返回0,失败返回-1。
该函数需要和共享内存的进程共同使用,因为sem
参数必须是在 共享内存上的。
进程间共享内存
这一部分指的是无名的信号量,即共享内存。
创建共享内存:
#include <sys/shm.h>
int shmget(key_t key, size_t size, int shmflg);
key
:一个键值,表示全局唯一的共享内存size
:共享内存的大小,单位是字节shmflg
:一个标记,一般设置为O_CREAT
,具体查手册。这里说明两个特殊的:SHM_HUGETLB
:系统使用大页面为共享内存分配空间SHM_NORESERVE
:不为共享内存保留交换分区
函数成功返回正整数值,是共享内存的标识符;失败返回-1。
共享内存创建完毕,需要先关联到进程的地址空间中;使用完毕后,也需要将它从进程地址空间中分离。调用如下两个函数:
#include <sys/shm.h>
void* shmat(int shm_id, const void* shm_addr, int shmflg); // 关联函数
int shmdt(const void* shm_addr); // 分离函数
shm_id
:由shmget
返回的共享内存标识符shm_addr
:如果是NULL
,关联的地址由操作系统决定,推荐这么做;非空的情况比较复杂,查找手册即可。
系统使用shmctl
操作共享内存的属性,具体参考手册。
#include <sys/shm.h>
int shmctl(int shm_id, int command, struct shmid_ds* buf);
共享内存的POSIX方法:
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
int shm_open(const char* name, int oflag, mode_t mode);
int shm_unlink(const char* name);
类似于mmap
方法,不过这里是具名共享内存。name
是共享内存的名称,oflag
是创建方式,具体参照手册,成功返回文件描述符,失败返回-1。第二个是取消连接,name
同上。
共享内存的多进程聊天室代码实例:
#include <sys/shm.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>#define USER_LIMIT 32
#define BUFFER_SIZE 1024
#define FD_LIMIT 65535
#define MAX_EVENT_NUMBER 1024
#define PROCESS_LIMIT 65536struct client_data {sockaddr_in address; // 客户端地址int connfd; // socket文件描述符pid_t pid; // 客户端进程的pidint pipefd[2]; // 和父进程通信的管道
};static const char* shm_name = "my_shm";
int sig_pipefd[2];
int epollfd = -1;
int listenfd = -1;
int shmfd = -1;
char* share_mem = 0;
client_data* users = 0; // 客户连接数据,进程客户连接的来索引数据
int* sub_process; // 子进程和客户连接的映射关系表,用进程的pid缩影这个数组
int user_count = 0; // 当前客户的数量
bool stop_child = false;int setnonblocking (int fd) {int old_option = fcntl (fd, F_GETFL);int new_option = old_option | O_NONBLOCK;assert (fcntl (fd, F_SETFL, new_option) != -1);return old_option;
}void addfd (int epollfd, int fd) {epoll_event event;bzero (&event, sizeof (event) );event.data.fd = fd;event.events |= EPOLLIN | EPOLLET;assert (epoll_ctl (epollfd, EPOLL_CTL_ADD, fd, &event) != -1);setnonblocking (fd);
}void sig_handler (int sig) {int save_errno = errno;int msg = sig;send (sig_pipefd[1], (char*) &msg, sizeof (char), 0);errno = save_errno;
}void addsig (int sig, void (*handler) (int), bool restart = true) {struct sigaction sa;bzero (&sa, sizeof (sa) );sa.sa_handler = handler;if (restart) {sa.sa_flags |= SA_RESTART;}sigfillset (&sa.sa_mask);assert (sigaction (sig, &sa, NULL) != -1);
}void del_resource() {close (sig_pipefd[0]);close (sig_pipefd[1]);close (listenfd);close (epollfd);shm_unlink (shm_name);delete[] users;delete[] sub_process;
}// 停止一个子进程
void child_term_handler (int sig) {stop_child = true;
}// 子进程运行的函数,idx指出子进程处理的客户连接的编号,
// users保存客户连接数据的数组
// share_mem支持共享内存的起始地址
int run_child (int idx, client_data* users, char* share_mem) {epoll_event events[MAX_EVENT_NUMBER];memset (events, 0, sizeof (events) );// 子进程IO复用同时监听两个文件描述符,// 分别是客户连接socket和与父进程通信的管道文件描述符int child_epollfd = epoll_create1 (0);assert (child_epollfd != -1);int connfd = users[idx].connfd;int pipefd = users[idx].pipefd[1];addfd (child_epollfd, pipefd);int ret;addsig (SIGTERM, child_term_handler, false);while (!stop_child) {int number = epoll_wait (child_epollfd, events, MAX_EVENT_NUMBER, -1);if (number < 0) {perror ("epoll failure\n");break;}for (int i = 0; i < number; ++i) {auto sockfd = events[i].data.fd;auto event = events[i].events;if (sockfd == connfd && event & EPOLLIN) {memset (share_mem + idx * BUFFER_SIZE, 0, BUFFER_SIZE);ret = recv (connfd, share_mem + idx * BUFFER_SIZE, BUFFER_SIZE - 1, 0);if (ret < 0) {if (errno != EINTR) {stop_child = true;}} else if (ret == 0) {stop_child = true;} else {// 通知主进程处理数据send (pipefd, (char*) &idx, sizeof (idx), 0);}} else if (sockfd == pipefd && event & EPOLLIN) {int client = 0;ret = recv (sockfd, (char*) &client, sizeof (client), 0);if (ret < 0) {if (errno != EAGAIN) {stop_child = true;}} else if (ret == 0) {stop_child = true;} else {send (connfd, share_mem + client * BUFFER_SIZE, BUFFER_SIZE, 0);}} else {continue;}}}close (connfd);close (pipefd);close (child_epollfd);return 0;
}int main (int argc, char* argv[]) {if (argc != 2) {perror ("Usage: %s <port> of server\n");return 1;}int port = atoi (argv[1]);if (port < 1024 || port > 65535) {perror ("port error\n");return 1;}struct sockaddr_in address;bzero (&address, sizeof (address) );address.sin_family = AF_INET;address.sin_port = htons (port);address.sin_addr.s_addr = htonl (INADDR_ANY);listenfd = socket (AF_INET, SOCK_STREAM, 0);if (listenfd < 0) {perror ("socket() error\n");return 1;}setnonblocking (listenfd); // 非阻塞监听int ret = bind (listenfd, (struct sockaddr*) &address, sizeof (address) );if (ret < 0) {perror ("bind() error\n");close (listenfd);return 1;}ret = listen (listenfd, 32);if (ret < 0) {perror ("listen() error\n");return 1;}user_count = 0;users = new client_data[USER_LIMIT + 1];sub_process = new int[PROCESS_LIMIT];memset (users, -1, USER_LIMIT * sizeof (int) );memset (sub_process, -1, PROCESS_LIMIT * sizeof (int) );epoll_event events[MAX_EVENT_NUMBER];memset (events, 0, sizeof (events) );epollfd = epoll_create1 (0);if (epollfd < 0) {perror ("epoll_create1() error\n");close (listenfd);return 1;}addfd (epollfd, listenfd);ret = socketpair (AF_UNIX, SOCK_STREAM, 0, sig_pipefd);if (ret < 0) {perror ("sockerpair() error\n");close (epollfd);close (listenfd);return 1;}setnonblocking (sig_pipefd[1]);addfd (epollfd, sig_pipefd[0]);addsig (SIGCHLD, sig_handler);addsig (SIGTERM, sig_handler);addsig (SIGINT, sig_handler);addsig (SIGPIPE, sig_handler);bool stop_server = false;bool terminate = false;shmfd = shm_open (shm_name, O_CREAT | O_RDWR, 0666);if (shmfd < 0) {perror ("shm_open() error\n");close (epollfd);close (listenfd);return 1;}share_mem = (char*) mmap (NULL, USER_LIMIT * BUFFER_SIZE, PROT_READ |PROT_WRITE, MAP_SHARED, shmfd, 0);if (share_mem == MAP_FAILED) {perror ("share_mem() error\n");close (epollfd);close (listenfd);close (shmfd);return 1;}while (!stop_server) {int number = epoll_wait (epollfd, events, MAX_EVENT_NUMBER, -1);if (number < 0 && errno != EINTR) {perror ("epoll_wait() error\n");break;}for (int i = 0; i < number; ++i) {auto sockfd = events[i].data.fd;auto event = events[i].events;if (sockfd == listenfd) {struct sockaddr_in client_address;bzero (&address, sizeof (address) );socklen_t client_addrlen = sizeof (client_address);int connfd = accept (listenfd, (struct sockaddr*) &client_address,&client_addrlen);if (connfd < 0) {printf ("errno is: %d\n", errno);continue;}if (user_count >= USER_LIMIT) {const char* info = "Too many users...";printf ("%s\n", info);send (connfd, info, strlen (info), 0);close (connfd);continue;}// 保存第user_count个客户连接的相关数据users[user_count].address = client_address;users[user_count].connfd = connfd;// 父子进程建立管道,传递必要的数据ret = socketpair (PF_UNIX, SOCK_STREAM, 0, users[user_count].pipefd);if (ret < 0) {perror ("socketpair() error\n");close (connfd);continue;}pid_t pid = fork();if (pid < 0) {close (connfd);continue;} else if (pid == 0) {close (epollfd);close (listenfd);close (users[user_count].pipefd[0]);close (sig_pipefd[0]);close (sig_pipefd[1]);run_child (user_count, users, share_mem);munmap ( (void*) share_mem, USER_LIMIT * BUFFER_SIZE);exit (0);} else {close (connfd);close (users[user_count].pipefd[1]);addfd (epollfd, users[user_count].pipefd[0]);users[user_count].pid = pid;// 记录新的客户连接在数组users中的索引值,// 建立进程pid和该索引值之间的映射关系sub_process[pid] = user_count;++user_count;}} else if (sockfd == sig_pipefd[0] && event & EPOLLIN) {// 处理信号事件char signals[1024];memset (signals, 0, sizeof (signals) );ret = recv (sig_pipefd[0], signals, sizeof (signals), 0);if (ret <= 0) {continue;} else {for (int i = 0; i < ret; ++i) {switch (signals[i]) {case SIGCHLD: // 子进程退出,有客户端断开连接pid_t pid;int stat;while ( (pid = waitpid (-1, &stat, WNOHANG) ) > 0) {int del_user = sub_process[pid];sub_process[pid] = -1;if (del_user < 0 || del_user > USER_LIMIT) {continue;}// 清除请求的相关数据epoll_ctl (epollfd, EPOLL_CTL_DEL, \users[del_user].pipefd[0], 0);close (users[del_user].pipefd[0]);users[del_user] = users[--user_count];sub_process (users[del_user].pid) = del_user;}if (terminate && user_count == 0) {stop_server = true;}break;}case SIGTERM:case SIGINT: {// 结束服务器程序puts ("kill all child now");if (user_count <= 0) {stop_server = true;break;}for (int i = 0; i < user_count; ++i) {int pid = users[i].pid;kill (pid, SIGTERM);}terminate = true;break;}default:break;}}} else if (event & EPOLLIN) { // 写入数据int child = 0;ret = recv (sockfd, (char*) &child, sizeof (child), 0);if (ret <= 0 ) {continue;} else {for (int j = 0; j < user_count; ++j) {if (users[j].pipefd[0] != sockfd) {printf ("send data to child accross pipe\n");send (users[j].pipefd[0], (char*) &child, sizeof (child), 0);}}}}}}del_resource();return 0;
}
Linux多进程编程(2)相关推荐
- Linux -- 多进程编程之 - 守护进程
内容概要 一.守护进程概述 二.守护进程创建 2.1.创建子进程,父进程退出 2.2.在子进程中创建新会话 2.2.1.进程组和会话期 2.2.2.setsid()函数说明 2.3.改变当前工作目录 ...
- LINUX 多进程编程 C语言实例
LINUX多进程编程 简单实例 1.ps与top命令 查看进程状态 2.系统调用ping,并执行 #include <stdio.h> #include <string.h> ...
- Linux多进程编程之在线词典
在线词典是基于Linux 多进程并发服务器编程,由服务器端和客户端构成,客户端可以运行在多个不同的主机上连接服务器,服务器对员工信息的操作结果通过数据库sqlite来保存.当用户登录后,根据用户名判断 ...
- linux多进程编程(一)
2019独角兽企业重金招聘Python工程师标准>>> 最近因为一个偶然的原因要在linux平台上做一个模拟实验,其中要涉及到多进程的编程,所以特此写一系列的博客来总结一下多进程的编 ...
- Linux多进程编程
fork系统调用 #include <sys/types.h> #include <unistd.h>/* Clone the calling process, creatin ...
- Linux多进程编程之 孤儿进程僵尸进程+wait函数
我们可否想过一个问题:使用fork()函数创建子进程,因为父进程和子进程的执行顺序是随机的 当父进程已经结束了,子进程还会继续存在并正常执行吗? 我们先看这个例子: guer1.c #include& ...
- linux多进程编程计算圆周率,中值积分定理计算PI值的多线程实现
// Parallel.cpp : 定义控制台应用程序的入口点. // #include "stdafx.h" #include #include static long num_ ...
- Linux多进程编程(1)
fork函数与exec系列函数系统调用 fork系统调用用于产生一个子进程,下面是基础用法: #include <sys/types.h> #include <unistd.h> ...
- 2022-1-16牛客C++项目——Linux多进程编程——进程间通信
常见的面试题: 1.你知道进程间通信的方式有哪几种吗? 2.这个通信方式具体的实现原理是什么?怎么实现的? 复习时候需要使用的问题: 1.进程间的通信是什么?进程间为什么需要通信? 2.进程间通信的目 ...
最新文章
- 这种事都有?建行网银把Demo版的放上线了?!
- Maltego发布新版本4.2.18
- MyBatis MapperScannerConfigurer配置——MyBatis学习笔记之八
- activiti7可以两个网关连着用吗
- java黑色_java-透明的黑色圆圈
- onlyoffice 收费不_西班牙银行开始泛滥收费,柜面取钱也要手续费
- iftop 流量监控
- 分布式代码管理系统GIT
- Mysql 分组后组内排序按字段取最大或最小的数据
- 即将改变软件开发的5个Java9新特性
- 算法(第4版) Chapter 5.2 单词查找树
- UIImagePickerController
- 央行超级网银8月上线 第三方支付平台或暂停接入
- stm32的HAL库uart的注意点
- html refresh原理,HTML meta refresh 刷新与跳转(重定向)页面
- 计算机经常断开网络,怎么解决电脑经常自动断网掉线的问题
- B站被骂上了热搜。。
- 重磅消息:微信支付分最新开通方法!
- 微信公众号基础功能搭建
- TN.STN液晶屏常见问题及解决办法
热门文章
- java server 参数_java serversocket参数详解
- WampServer安装教程
- Python之 break退出循环
- Youki的C++命名规则
- 【less-24】基于SQLI的二次注入
- 使用 docker 命令不用加 sudo
- 【Qt教程】2.3 - Qt5 控件 - 按钮组(QPushButton、QToolButton、QRadioButton、QCheckBox)资源编辑器导入资源
- LeetCode 154. 寻找旋转排序数组中的最小值 II (二分)
- 【MySql】100问
- CSDN10大博客栏目火热评选中