进程池与线程池出发点一样,都是考虑多核情况下任务的并行处理;
从多进程和多线程编程的区别上看,多线程有许多的同步、互斥的方法,较擅长于异步协作;而多进程同步、互斥的方法相对比较麻烦,则更多地考虑上下文独立执行;
从Nginx使用线程池/进程池处理大并发的思路去分析,其实就是多客户端大量连接的场景;主进程监听是否有新客户端tcp连接,然后分发给工作进程去响应http请求,在这种场景下每个连接都是一个独立的上下文逻辑,每个工作进程的内容都是对等地处理http请求,这种情况就非常适合进程池的方式;

把上述的场景给抽象出来,关注于父子进程的通讯,也就是下图流程:

Master主进程给Worker子进程分发任务,通讯的方式用的就是单向管道的方式;

分发任务这块,可以有随机分发、轮流分发、计分板等多种方法,具体可以结合业务进行设计;

本节根据上述流程图进行一个简单的实现,假设分别有A、B、C三种任务,主进程使用轮流分发(Round-robin)把任务均匀地分配给子进程们;

在结构体process上得考虑一下,Master进程需要保存所有子进程的进程号、管道号;

/* Process struct */
typedef struct process
{char name[SIZE_NAME_NORMAL];                    /* Process name */pid_t pid;                                      /* Process id */int pipefd[2];                                  /* Connection between master/slave */size_t score;                                   /* Score record */
} process_t;/* Program instance */
typedef struct instance
{char prg_name[SIZE_NAME_LONG];                  /* Program name with path */char cfg_name[SIZE_NAME_LONG];                  /* Configure name with path */u16 process_num;                                /* Sub process number */u16 process_idx;                                /* Current process index */struct process *proc;                           /* Process struct */
} instance_t;

进程池的初始化函数,根据给定的Worker进程数开启子进程;

int process_pool(instance_t *pinst, u16 process_num)
{int ret = FAILURE;int ix  = 0;int status = 0;if ( !pinst || !process_num ) {printf("NULL\n");goto _E1;}signal(SIGINT,  __sig_quit);signal(SIGTERM, __sig_quit);pinst->process_idx = 0;pinst->process_num = process_num;pinst->proc = (process_t *)calloc(process_num + 1, sizeof(process_t));if ( !pinst->proc ) {printf("Alloc process pool struct failed\n");goto _E1;}for ( ix = 1; ix <= process_num; ix++ ) {int bufsize = 1;ret = pipe(pinst->proc[ix].pipefd);if ( SUCCESS != ret ) {printf("socketpair\n");goto _E2;}printf("Setup worker#%u\n", ix);pinst->proc[ix].pid = fork();if ( pinst->proc[ix].pid < 0 ) {printf("fork\n");goto _E2;}else if ( pinst->proc[ix].pid > 0 ) {/* Father */CLOSE_FD(pinst->proc[ix].pipefd[0]);continue;}else {/* Child */CLOSE_FD(pinst->proc[ix].pipefd[1]);pinst->process_idx = ix;ret = __worker(pinst);goto _E2;}}ret = __master(pinst);/* Waiting workers */for ( ix = 1; ix <= pinst->process_num; ix++ ) {waitpid(pinst->proc[ix].pid, &status, WNOHANG);}_E2:for ( ix = 1; ix <= pinst->process_num; ix++ ) {CLOSE_FD(pinst->proc[ix].pipefd[1]);CLOSE_FD(pinst->proc[ix].pipefd[0]);}FREE_POINTER(pinst->proc);
_E1:return ret;
}

然后就是Master Worker的实现,Master就是简单地进行一个事件分发,通过约定的协议"ABC"代表三种工作命令,“Q“代表退出进程;

static int __master(instance_t *pinst)
{int ret  = 0;int fd   = 0;int ix   = 0;int roll = 0;char c = 0;#define __offset(pinst) ((pinst)->proc[(pinst)->process_idx])
#define __round_robin(pinst, roll) \((pinst)->proc[((roll) % (pinst)->process_num) + 1].pipefd[1])printf("Master#%u setup\n", pinst->process_idx);for ( g_enable = 1; g_enable; ) {/* Get pipe fd by round-robin */fd = __round_robin(pinst, ++roll);c = 'A' + roll % 3; // 'A'/'B'/'C'ret = write(fd, &c, 1);if ( ret <= 0 ) {return FAILURE;}sleep(1);}/* Tell all workers to quit */for ( ix = 1; ix <= pinst->process_num; ix++ ) {c = 'Q';write(__round_robin(pinst, ++roll), &c, 1);}printf("Master#%u shutdown\n", pinst->process_idx);return SUCCESS;
}
static int __worker(instance_t *pinst)
{int fd = __offset(pinst).pipefd[0];int ix = 0;ssize_t read_byte = FAILURE;char buffer[1024] = {0};printf("Worker#%u setup\n", pinst->process_idx);for ( g_enable = 1; g_enable; ) {read_byte = read(fd, buffer, sizeof(buffer));if ( read_byte <= 0 ) {if ( errno == EAGAIN || errno == EINTR ) {continue;}return FAILURE;}for ( ix = 0; ix < read_byte; ix++ ) {switch ( buffer[ix] ) {case 'A':case 'B':case 'C':__offset(pinst).score += buffer[ix];printf("Worker#%u Recv command: %c, score: %llu\n",pinst->process_idx,buffer[ix], __offset(pinst).score);break;case 'Q':printf("Quit\n");g_enable = 0;break;default:break;}}}printf("Worker#%u shutdown\n", pinst->process_idx);return SUCCESS;
}

最后带一个main方法,考虑了进程释放的问题,在这个demo中使用信号对genable进行控制;

static u8 g_enable;                         /* Running flag */
static void __sig_quit(int sig)
{g_enable = 0;
}int main(int argc, char *argv[])
{instance_t inst = {0};if ( argc < 2 ) {printf("Usage: \n\t%s < process number >\n", argv[0]);return EXIT_FAILURE;}return process_pool(&inst, atoi(argv[1]));
}

完整代码如下:

/****根据上述流程图进行一个简单的实现,假设分别有A、B、C三种任务,主进程使用轮流分发(Round-robin)把任务均匀地分配给子进程们;*在结构体process上得考虑一下,Master进程需要保存所有子进程的进程号、管道号;**/#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <signal.h>
#include <unistd.h>
#include <sys/types.h>#define FAILURE -1
#define u8  unsigned char#define u16 unsigned  short#define SIZE_NAME_NORMAL 48#define SIZE_NAME_LONG 48#define SUCCESS 0#define CLOSE_FD(x) (close(x))#define FREE_POINTER(x) (free(x))
#define __offset(pinst) ((pinst)->proc[(pinst)->process_idx])/* 写管道*/
#define __round_robin(pinst, roll) \((pinst)->proc[((roll) % (pinst)->process_num) + 1].pipefd[1])
/****本节根据上述流程图进行一个简单的实现,假设分别有A、B、C三种任务,主进程使用轮流分发(Round-robin)把任务均匀地分配给子进程们;*在结构体process上得考虑一下,Master进程需要保存所有子进程的进程号、管道号;**//* Process struct */
typedef struct process
{char name[SIZE_NAME_NORMAL];                    /* Process name */pid_t pid;                                      /* Process id */int pipefd[2];                                  /* Connection between master/slave */size_t score;                                   /* Score record */
} process_t;/* Program instance */
typedef struct instance
{char prg_name[SIZE_NAME_LONG];                  /* Program name with path */char cfg_name[SIZE_NAME_LONG];                  /* Configure name with path */u16 process_num;                                /* Sub process number */u16 process_idx;                                /* Current process index */struct process *proc;                           /* Process struct */} instance_t;static u8 g_enable;                         /* Running flag */static void __sig_quit(int sig)
{g_enable = 0;
}static int __master(instance_t *pinst)
{int ret  = 0;int fd   = 0;int ix   = 0;int roll = 0;char c = 0;printf("Master#%u setup\n", pinst->process_idx);for ( g_enable = 1; g_enable; ) {/* Get pipe fd by round-robin */fd = __round_robin(pinst, ++roll);c = 'A' + roll % 3; // 'A'/'B'/'C'ret = write(fd, &c, 1);if ( ret <= 0 ) {return FAILURE;}sleep(1);}/* Tell all workers to quit */for ( ix = 1; ix <= pinst->process_num; ix++ ) {c = 'Q';write(__round_robin(pinst, ++roll), &c, 1);}printf("Master#%u shutdown\n", pinst->process_idx);return SUCCESS;
}static int __worker(instance_t *pinst)
{int fd = __offset(pinst).pipefd[0];int ix = 0;ssize_t read_byte = FAILURE;char buffer[1024] = {0};printf("Worker#%u setup\n", pinst->process_idx);for ( g_enable = 1; g_enable; ) {printf("work read front......\n");read_byte = read(fd, buffer, sizeof(buffer)); /* 读管道会阻塞*/printf("work read after......\n");if ( read_byte <= 0 ) {if ( errno == EAGAIN || errno == EINTR ) {continue;}return FAILURE;}for ( ix = 0; ix < read_byte; ix++ ) {switch ( buffer[ix] ) {case 'A':case 'B':case 'C':__offset(pinst).score += buffer[ix];printf("Worker#%u Recv command: %c, score: %llu\n",pinst->process_idx,buffer[ix], __offset(pinst).score);break;case 'Q':printf("Quit\n");g_enable = 0;break;default:break;}}}printf("Worker#%u shutdown\n", pinst->process_idx);return SUCCESS;
}int process_pool(instance_t *pinst, u16 process_num)
{int ret = FAILURE;int ix  = 0;int status = 0;if ( !pinst || !process_num ) {printf("NULL\n");goto _E1;}/*** SIGINT ctrl+c 终止进程*/signal(SIGINT,  __sig_quit);/** SIGTERM 程序结束(terminate)信号, 与SIGKILL不同的是该信号可以被阻塞和*处理. 通常用来要求程序自己正常退出. shell命令kill缺省产生这*个信号. */signal(SIGTERM, __sig_quit);pinst->process_idx = 0;pinst->process_num = process_num;pinst->proc = (process_t *)calloc(process_num + 1, sizeof(process_t));if ( !pinst->proc ) {printf("Alloc process pool struct failed\n");goto _E1;}for ( ix = 1; ix <= process_num; ix++ ) {int bufsize = 1;/*fd[0]:读管道,fd[1]:写管道*/ret = pipe(pinst->proc[ix].pipefd);if ( SUCCESS != ret ) {printf("socketpair\n");goto _E2;}printf("Setup worker#%u\n", ix);pinst->proc[ix].pid = fork();if ( pinst->proc[ix].pid < 0 ) {printf("fork\n");goto _E2;}else if ( pinst->proc[ix].pid > 0 ) {/* Father process close read fd pipe */CLOSE_FD(pinst->proc[ix].pipefd[0]);printf(".......\n");continue;}else {/* Child process close write fd pipe */CLOSE_FD(pinst->proc[ix].pipefd[1]);printf("xxxxxxxx\n");pinst->process_idx = ix;ret = __worker(pinst);goto _E2;}}ret = __master(pinst);/* Waiting workers */for ( ix = 1; ix <= pinst->process_num; ix++ ) {waitpid(pinst->proc[ix].pid, &status, WNOHANG);}_E2:for ( ix = 1; ix <= pinst->process_num; ix++ ) {CLOSE_FD(pinst->proc[ix].pipefd[1]);CLOSE_FD(pinst->proc[ix].pipefd[0]);}FREE_POINTER(pinst->proc);_E1:return ret;
}int main(int argc, char *argv[])
{instance_t inst = {0};  /*实例结构体*/if ( argc < 2 ) {printf("Usage: \n\t%s < process number >\n", argv[0]);return EXIT_FAILURE;}return process_pool(&inst, atoi(argv[1]));
}

c语言进程池原理及实现相关推荐

  1. Linux——匿名管道、命名管道及进程池概念和实现原理

    目录 一.什么是匿名管道 二.如何使用匿名管道 (一).pipe原理 (二).pipe使用 三.命名管道概念及区别 (一).什么是命名管道 (二).与匿名管道的联系和区别 四.命名管道的使用 (一). ...

  2. Linux高性能服务器编程:进程池和线程池原理及应用(有图有代码有真相!!!)

    一.问题引入 在前面编写多进程.多线程服务器时通过动态创建子进程和子线程来实现并发服务器,这样做有以下缺点: 1)动态创建进程.线程将会比较耗费时间,将导致较慢的客户响应. 2)动态创建的子进程只为一 ...

  3. python进程池的实现原理_Python基于进程池实现多进程过程解析

    1.注意:pool必须在 if __name__ == '__main__' 下面运行,不然会报错 2.多进程内出现错误会直接跳过该进程,并且默认不会打印错误信息 3.if__name__下面的数据需 ...

  4. Python实验项目1例:使用进程池统计指定范围内素数的个数

    本周赠书活动:董付国老师Python系列教材赠书活动(40本) -------------------------------- 适用专业: 适用于计算机.网络工程.软件工程等相关专业,其他专业选做. ...

  5. java并发包线程池原理分析锁的深度化

    java并发包&线程池原理分析&锁的深度化 并发包 同步容器类 Vector与ArrayList区别 1.ArrayList是最常用的List实现类,内部是通过数组实现的,它允许对元素 ...

  6. 【5】线程池原理分析

    目录 知识点1:并发包 1.(计数器)CountDownLatch 2.(屏障)CyclicBarrier 3.(计数信号量)Semaphore (1)案例 4.并发队列 5.阻塞队列与非阻塞队 (1 ...

  7. Python线程池、进程池的介绍与使用

    文章目录 1.问题背景 2.单线程→\rightarrow→多线程→\rightarrow→线程池 2.1单线程简介 2.2多线程简介 2.3线程池介绍 2.3.1复用线程 2.3.2线程池的使用 3 ...

  8. java架构学习——5. 线程池原理剖析锁的深度化

    本篇博文主要包含: 线程池的基本概念 ThreadPoolExecutor 线程池四种创建方式 -newCachedThreadPool:可缓存线程池 -newFixedThreadPool:定长线程 ...

  9. Golang 侧数据库连接池原理和参数调优

    Golang 侧数据库连接池原理和参数调优 文章目录 Golang 侧数据库连接池原理和参数调优 数据库连接池 数据库连接池的设计 Go 的数据库连接池 Go 数据库连接池的设计 建立连接 释放连接 ...

  10. python3 线程池源码解析_5分钟看懂系列:Python 线程池原理及实现

    概述 传统多线程方案会使用"即时创建, 即时销毁"的策略.尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器 ...

最新文章

  1. PhpStorm 默认快捷键
  2. JavaScript实现最小公倍数LCM算法(附完整源码)
  3. 【Python3网络爬虫开发实战】 1-开发环境配置
  4. Linux多进程编程(2)
  5. mysql怎么更改gbk_把mysql 中的字符gb2312 改为gbk的方法
  6. CDHtmlDialog探索----WebBrowser扩展和网页Javascript错误处理
  7. nodejs 下载最新版本
  8. 【华为OD机试真题 JS】字符串分割
  9. GNN IS A COUNTER REVISITING GNN FOR QUESTION ANSWERING
  10. 前端css实现气泡框
  11. CentOS7.5下载及安装过程
  12. Machine Learning读书会·北京今日启动(第3期周爱民、张帆)
  13. 自己写的一点福利代码(一)
  14. android dp不同高度,Android获取屏幕的宽度和高度(dp)
  15. 第一单元测试问星卷计算机,【第一单元综合测试卷测量】第一单元综合测试卷答案...
  16. Web的组成架构模型
  17. 信息传递(tarjan算法)
  18. k8s.4-kubeadm部署高可用kubernetes集群 1.21
  19. BUGKU——秋名山/never give up
  20. Keil uvision5 C51软件安装教程附下载地址

热门文章

  1. 十大java视频学习网站
  2. 2020爱分析·智能通讯云厂商全景报告
  3. NLPIR系统的中文语义分析模式介绍
  4. 在Winfrom中双击Ctrl键打开窗体
  5. poi导出如何设定宽度_POI导出excel列宽自适应
  6. java做校园一卡通技术_基于jsp的校园一卡通管理系统-JavaEE实现校园一卡通管理系统 - java项目源码...
  7. Java连接数据库实现增删改;查。
  8. 数据库SQL语句之外键
  9. 基于物联网平台开发手机混合 App
  10. 信号与系统——有关卷积的意义