简介

在上一篇笔记中,提到了使用信号量来处理多进程的“惊群”现象。这也不是一个好的方式,因为多进程可能同时都在处理大量的任务,导致无法及时接受连接。同时,这也无法进行负载均衡。在这里,给出传递文件描述符的方式,主进程负责接受连接,然后根据子进程的连接数量,分配连接给最少的子进程。传递文件描述符的算法参考这篇笔记。

代码实例

代码暂时有部分问题,咱这里先提供一个总体思路,具体bug留作后期修复。父进程负责接收连接,然后把连接额套接字发送给子进程。每个进程本身是一个单向管道,信号通过管道发送到进程的主循环中进程处理。主进程和子进程有一个双向管道,主进程向子进程发送新来的套接字,子进程向主进程发送客户离开的消息,从而让主进程可以实现简单的负载均衡。

#include <sys/socket.h>
#include <sys/types.h>
#include <assert.h>
#include <stdlib.h>
#include <strings.h>
#include <stdio.h>
#include <string.h>
#include <wait.h>
#include <signal.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <errno.h>
#include <arpa/inet.h>
#include <vector>
#include <algorithm>
#include <unistd.h>const int MAX_WAIT_NUM = 32;
const int MAX_EVENT_NUM = 200;
const int MAX_PROCESS_NUM = 10;
const int CHILD_SIDE = 1;
const int PARENT_SIDE = 0;static const size_t CONTROL_LEN = CMSG_LEN(sizeof(int));int _epollfd;
int _pipefd[2];epoll_event _events[MAX_EVENT_NUM];struct ProcessInfo {int pipefd[2];pid_t pid;int cnt;ProcessInfo() {pid = -1;cnt = 0;pipefd[0] = pipefd[1] = -1;}
};std::vector<ProcessInfo> _processes;  // 孩子进程/** 主进程给子进程发送接收的文件描述符,使子进程可以获取客户端的连接* 子进程给主进程发送自己的进程号,表示有一个客户端断开连接,从而更新数据* */void sendfd(int fd, int fd_to_send);  // 发送fd_to_send文件描述符
int recvfd(int fd);  // 接收文件描述符
int addsig(int sig, void(*sig_handler)(int));  // sig信号添加sig_handler处理函数
int setnonblocking(int fd);  // fd设置为非阻塞的
int addepollfd(int epollfd, int fd, int events = EPOLLET | EPOLLIN);  // epollfd注册fd的事件
void send_sig_to_process(int sig);  // 信号通过_pipefd管道发送给进程
void sig_child_exit(int sig);  // 处理sigint信号,孩子进程退出
int child_process(int idx);  // 孩子进程的主函数int main(int argc, char *argv[]) {if (argc != 3) {printf("Usage: %s <port of server> <number of child process>\n", argv[0]);exit(EXIT_FAILURE);}int port = atoi(argv[1]);if (port < 1024 || port > 65535) {perror("port error\n");exit(EXIT_FAILURE);}int processNum = atoi(argv[2]);if (processNum < 1 || processNum > MAX_PROCESS_NUM) {perror("number of child process must between 1-10");exit(EXIT_FAILURE);}struct sockaddr_in serv;bzero(&serv, sizeof(serv));serv.sin_family = AF_INET;serv.sin_addr.s_addr = htonl(INADDR_ANY);serv.sin_port = htons(port);int listenfd = socket(AF_INET, SOCK_STREAM, 0);if (listenfd < 0) {perror("socket() error\n");exit(1);}setnonblocking(listenfd);if (bind(listenfd, (struct sockaddr *) &serv, sizeof(serv)) < 0) {perror("bind() error\n");exit(1);}if (listen(listenfd, MAX_WAIT_NUM) < 0) {perror("listen() error\n");exit(1);}// 建立通信管道int cnt = 0;for (int i = 0; i < processNum; ++i) {ProcessInfo processInfo;if (socketpair(AF_UNIX, SOCK_STREAM, 0, processInfo.pipefd) < 0) {perror("socketpair() error\n");continue;}_processes.push_back(processInfo);++cnt;close(processInfo.pipefd[CHILD_SIDE]);   /}processNum = cnt;// 建立进程池for (int i = 0; i < processNum; ++i) {pid_t pid = fork();if (pid == 0) {close(listenfd);int ret = child_process(i);exit(ret);}_processes[i].pid = pid;_processes[i].cnt = 0;}// 添加处理信号addsig(SIGCHLD, send_sig_to_process);addsig(SIGINT, send_sig_to_process);pipe(_pipefd);  // 建立信号管道_epollfd = epoll_create1(0);memset(_events, 0, sizeof(_events));addepollfd(_epollfd, listenfd);addepollfd(_epollfd, _pipefd[0]);setnonblocking(_epollfd);bool stop_server = false;while (!stop_server) {int num = epoll_wait(_epollfd, _events, MAX_EVENT_NUM, -1);if (num < 0) {perror("epollwait() error");break;}for (int i = 0; i < num; ++i) {int fd = _events[i].data.fd;int event = _events[i].events;if ((fd == listenfd) && (event & EPOLLIN)) {int connfd = -1;while ((connfd = accept(listenfd, nullptr, nullptr)) > 0) {printf("main connfd = %d\n", connfd);cnt = 100000;auto n = _processes.size();int idx = 0;for (int j = 0; j < n; ++j) {if (cnt <= _processes[j].cnt) {continue;}cnt = _processes[j].cnt;idx = j;}++_processes[idx].cnt;puts("before sendfd");// 向子进程传递文件描述符sendfd(_processes[idx].pipefd[PARENT_SIDE], connfd);puts("after sendfd");close(connfd);}} else if ((fd == _pipefd[0]) && (event & EPOLLIN)) {int sig;read(fd, (char *) &sig, sizeof(int));// 在这里添加处理信号switch (sig) {case SIGCHLD:sig_child_exit(SIGCHLD);break;case SIGINT:stop_server = true;break;default:break;}} else {  // 子进程发来的pid,表示自己有一个客户端断开连接pid_t pid;recv(fd, (char *) &pid, sizeof(pid_t), 0);for (auto &it: _processes) {if (it.pid == pid) {--it.cnt;break;}}}}}// 清理所有的孩子进程for (const auto &it: _processes) {kill(it.pid, SIGINT);}wait(nullptr);exit(EXIT_SUCCESS);
}void sendfd(int fd, int fd_to_send) {struct msghdr msg;struct iovec iov[1];char buf[0];iov[0].iov_base = buf;iov[0].iov_len = 1;msg.msg_name = NULL;msg.msg_namelen = 0;msg.msg_iov = iov;msg.msg_iovlen = 1;struct cmsghdr cmsg;cmsg.cmsg_type = SCM_RIGHTS;cmsg.cmsg_level = SOL_SOCKET;cmsg.cmsg_len = CONTROL_LEN;* (int *) CMSG_DATA (&cmsg) = fd_to_send;msg.msg_control = &cmsg;msg.msg_controllen = CONTROL_LEN;sendmsg (fd, &msg, 0);
}int recvfd(int fd) {struct msghdr msg;struct iovec iov[1];char buf[0];iov[0].iov_base = buf;iov[0].iov_len = 1;msg.msg_name = NULL;msg.msg_namelen = 0;msg.msg_iov = iov;msg.msg_iovlen = 1;struct cmsghdr cmsg;msg.msg_control = &cmsg;msg.msg_controllen = CONTROL_LEN;recvmsg (fd, &msg, 0);int fd_to_read = * (int *) CMSG_DATA (&cmsg);return fd_to_read;
}int addsig(int sig, void(*sig_handler)(int)) {struct sigaction sa;bzero(&sa, sizeof(sa));sa.sa_handler = sig_handler;sa.sa_flags = SA_RESTART;return sigaction(sig, &sa, nullptr);
}int setnonblocking(int fd) {auto old_option = fcntl(fd, F_GETFL);auto new_option = old_option | O_NONBLOCK;if (fcntl(fd, F_SETFL, new_option) != -1) {return new_option;}return -1;
}int addepollfd(int epollfd, int fd, int events) {epoll_event ev;bzero(&ev, sizeof(ev));ev.data.fd = fd;ev.events = events;if (epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev) != -1) {setnonblocking(fd);return 0;}return -1;
}void send_sig_to_process(int sig) {int save_errno = errno;int msg = sig;send(_pipefd[1], (char *) &msg, 1, 0);sig = msg;errno = save_errno;
}void sig_child_exit(int sig) {if (sig != SIGCHLD) {return;}// 清理掉退出的孩子进程_processes.erase(std::remove_if(_processes.begin(),_processes.end(),[](const ProcessInfo &info) -> bool {return info.pid == waitpid(info.pid, nullptr, WNOHANG);}),_processes.end());
}int child_process(int idx) {ProcessInfo info = _processes[idx];_processes.clear();_epollfd = epoll_create1(0);if (_epollfd < 0) {return -1;}memset(_events, 0, sizeof(_events));setnonblocking(_epollfd);addsig(SIGINT, send_sig_to_process);pipe(_pipefd);  // 子进程内部通信的管道addepollfd(_epollfd, _pipefd[0]);addepollfd(_epollfd, info.pipefd[CHILD_SIDE]);close(info.pipefd[PARENT_SIDE]);  ///while (true) {int num = epoll_wait(_epollfd, _events, MAX_EVENT_NUM, -1);if (num < 0) {printf("child process %d, epoll_wait() error\n", getpid());return EXIT_FAILURE;}printf("child num: %d\n", num);for (int i = 0; i < num; ++i) {int fd = _events[i].data.fd;int event = _events[i].events;if ((fd == info.pipefd[CHILD_SIDE]) && (event & EPOLLIN)) {int readfd = recvfd(fd);assert(readfd != -1);//int ret = addepollfd(_epollfd, readfd, EPOLLIN | EPOLLRDHUP | EPOLLET);assert(ret != -1);puts("child process gets client");//} else if ((fd == _pipefd[0]) && (event & EPOLLIN)) {int sig;read(_pipefd[0], (char *) &sig, sizeof(int));switch (sig) {case SIGINT:return EXIT_SUCCESS;default:break;}} else if ((fd != _pipefd[0]) && (event & EPOLLRDHUP)) {   // 客户主动断开连接pid_t pid;send(info.pipefd[CHILD_SIDE], (char *) &pid, sizeof(pid), 0);puts("child client left");} else {  // 客户端发来的数据printf("child get client data");char buf[1024];while (recv(fd, buf, 1024, 0) > 0) {printf("%s", buf);}}}}
}

Linux下的进程池(3)相关推荐

  1. Linux下的进程池(1)

    简介 基于Linux的进程池的实现,并发进程池有多种实现模式,在这里统一进行分析.首先给出客户端的测试代码: #include <sys/socket.h> #include <sy ...

  2. Linux下的进程池(2)

    简介 在上篇笔记中,记录了无锁的accept结构,但是出现惊群现象.最简单的避免方式是添加一个信号量锁即可,参考这篇笔记即可. 代码 仅仅添加几行改动,客户端使用前一个笔记的.给出代码: #inclu ...

  3. Linux下通用线程池的创建与使用

    Linux下通用线程池的创建与使用 本文给出了一个通用的线程池框架,该框架将与线程执行相关的任务进行了高层次的抽象,使之与具体的执行任务无关.另外该线程池具有动态伸缩性,它能根据执行任务的轻重自动调整 ...

  4. linux 下得到进程的启动时间

    linux 下得到进程的启动时间! 运行方式:./pstart 进程号 " 如: ./pstart 1 #!/bin/bash pid=$1 if [ "$pid" == ...

  5. linux下查看进程占用端口和端口占用进程命令

    Linux下查看进程占用端口: 查看程序对应进程号:ps –ef|grep 进程名 REDHAT :查看进程号所占用的端口号:netstat –nltp|grep 进程号 ubuntu:查看进程占用端 ...

  6. linux下杀死进程全权讲解

    linux下杀死进程全权讲解 2009-10-27 08:57 佚名 linux 我要评论(0) 字号:T | T 本文将详细讲解linux杀死进程的多种命令,包含他们的作用,kill作用:根据进程号 ...

  7. linux下查看进程的线程数,linux查看进程的线程数

    top -H -p $PID  #查看对应进程的那个线程占用CPU过高 1.top -H 手册中说:-H : Threads toggle 加上这个选项启动top,top一行显示一个线程.否则,它一行 ...

  8. 查看linux进程的设备io,Linux下查看进程IO工具iopp

    Linux下的IO检测工具最常用的是iostat,不过iostat只能查看到总的IO情况.如果要细看具体那一个程序点用的IO较高,可以使用iotop .不过iotop对内核版本和Python版本有要求 ...

  9. linux 查看进程变量,Linux下查看进程(程序)启动时的环境变量

    Linux下查看进程(程序)启动时的环境变量 Linux的pargs ==================================== 今天又遇到一个老问题: 同事遇到了sqlplus &qu ...

最新文章

  1. linux7 kernel.sem,centos7.4内核调优,tcp单服务器万级并发
  2. matplotlib 中子图的创建
  3. vux 实现多栏滚动
  4. 充电桩服务器协议,充电桩与云服务器通信协议
  5. 笔记(2015-07-24)
  6. pg数据库开启远程连接_Postgresql开启远程访问的步骤全纪录
  7. 王道机试指南读后总结-3
  8. pidgin-lwqq
  9. C++ std::lock_guard 自动加锁、释放锁 原理
  10. Linux之常用操作命令总结二
  11. 【Python+GDAL矢量数据操作】
  12. 微信开发者工具测试方法
  13. KafKa 开启 SASL 验证
  14. 流媒体高清视频校园直播点播
  15. 华为游戏小程序快应用账号登录6004报错集合
  16. linux下删除以-开头的文件
  17. C++中map的遍历
  18. php在foreach循环后留下数组的引用问题
  19. 2022,是结束,亦是开始
  20. 【Ansoft Maxwell】Unable to locate or start COM engine on ‘Loacal Machine‘解决方案

热门文章

  1. java 快速从树节点找到数据_14期每日分享Java程序员分享超全哈希相关的知识
  2. Ingenious Lottery Tickets 模拟
  3. ## CSP 201312-2 ISBN号码(C语言)(100分)
  4. 《南溪的目标检测学习笔记》——模型预处理的学习笔记
  5. Linux的学习笔记~
  6. Python——常用Python包的学习笔记
  7. sklearn炼丹术之——交叉验证Cross-validation: evaluating estimator performance
  8. leetcode41. First Missing Positive
  9. Eclipse修改相同内容的高亮显示(pydev编辑python)
  10. 查看tensorflow版本以及路径: