Linux网络编程8——线程池模型
学习视频链接
05-线程池模型原理分析_bilibili_哔哩哔哩_bilibilihttps://www.bilibili.com/video/BV1iJ411S7UA?p=91&vd_source=0471cde1c644648fafd07b54e303c905
目录
一、简介
1.1 以前程序存在的问题
1.2 解决方法
1.3 处理线程扩容或者减少
二、代码
2.1 分析
2.2 代码
一、简介
1.1 以前程序存在的问题
多线程服务器,每次客户端发过来信息服务端就会创建一个线程,处理完了再销毁线程开销有点大
1.2 解决方法
提前创建好线程,线程阻塞等待某个客户端数据来了后,分配一个线程处理数据,处理完了以后线程不删除,继续阻塞
1.3 处理线程扩容或者减少
半夜人少的时候减少线程的使用
白天人多的时候扩容线程,线程数量不够用的时候也需要扩容
二、代码
2.1 分析
结构体
模块分析
1、main();
创建线程池
向线程池中添加任务。借助回调处理任务
销毁线程池
2、pthreadpool_create();
创建线程池结构体指针
初始化线程池结构体 { N 个成员变量 }
创建 N 个任务线程
创建 1 个管理者线程
失败时,销毁开辟的所有空间 (释放)
3、threadpool_thread()
进入子线程回调函数
接收参数 void *arg -> pool 结构体
加锁 -> lock -> 整个结构体锁
判断条件变量 -> wait
4、adjust_thread();
循环 10s 执行一次
进入管理者线程回调函数
接收参数 void *arg -> pool 结构体
加锁 -> lock -> 整个结构体锁
获取管理线程池要用的到变量 task_num,live_num,busy_num
根据既定算法,使用上述 3 变量,判断是否应该创建、销毁线程池中指定步长的线程
5、threadpool_add();
总功能:
模拟产生任务。 num[20]
设置回调函数,处理任务。 sleep(1) 代表处理完成
内部实现:
初始化 任务队列结构体成员。 回调函数 function,arg
利用环形队列机制,实现添加任务。 借助队尾指针挪移 % 实现
唤醒阻塞在 条件变量上的线程
解锁
6、从 3、中 wait 之后继续执行,处理任务
加锁
获取 任务处理回调函数及参数
利用环形队列,实现处理任务。借助队头指针挪移 % 实现
唤醒阻塞在 条件变量 上的 server
解锁
加锁
改忙线程数++
解锁
执行处理任务的线程
加锁
改忙线程数--
解锁
7、创建、销毁线程
管理者线程根据 task_num、live_num、busy_num
根据既定算法,使用上述 3 变量,判断是否应该 创建、销毁线程池中 指定步长的线程
如果满足 创建条件
pthread_create(); 回调 任务线程函数 live_num++
如果满足 销毁条件
wait_exit_thr_num = 10;
signal 给 阻塞在条件变量上的线程 发送 假条件满足信号
跳转至 wait 阻塞线程会被 假信号 唤醒。 wait_exit_thr_num > 0 pthread_exit();
2.2 代码
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <signal.h>
#include <errno.h>
//#include "threadpool.h"#define DEFAULT_TIME 10 // 10s检测一次
#define MIN_WAIT_TASK_NUM 10 // 如果 queue_size > MIN_WAIT_TASK_NUM 添加新的线程到线程池
#define DEFAULT_THREAD_VARY 10 // 每次创建和销毁线程的个数
#define true 1
#define false 0typedef struct {void *(*function)(void *); // 指针函数,回调函数void *arg; // 上面函数的参数
} threadpool_task_t; // 各子线程任务结构体// 描述线程池相关信息
typedef struct threadpool_t { pthread_mutex_t lock; // 用于锁住本结构体pthread_mutex_t thread_counter; // 记录忙状态线程个数的锁 busy_thr_numpthread_cond_t queue_not_full; // 当任务队列满时,添加任务的线程阻塞,等待此条件变量pthread_cond_t queue_not_empty; // 当任务队列里不为空时,通知等待任务的线程pthread_t *threads; // 存放线程池中每个线程的 tid 数组pthread_t adjust_tid; // 管理线程 tidthreadpool_task_t *task_queue; // 任务队列(数组首地址)int min_thr_num; // 线程池最小线程数 int max_thr_num; // 线程池最大线程数int live_thr_num; // 当前存活线程个数int busy_thr_num; // 忙状态线程个数int wait_exit_thr_num; // 要销毁的线程个数int queue_front; // task_queue 队头下标int queue_rear; // task_queue 队尾下标int queue_size; // task_queue 队中实际任务数int queue_max_size; // task_queue 队列可容纳任务数上限int shutdown; // 标志位,线程池使用状态,true 或 false
} threadpool_t;void *threadpool_thread(void *threadpool);
void *adjust_thread(void *threadpool);
int is_thread_alive(pthread_t tid);
int threadpool_free(threadpool_t *pool);// threadpool_ create(3, 100, 100);
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
{int i;threadpool_t *pool = NULL; // 线程池结构体do {if((pool = (threadpool_t*)malloc(sizeof(threadpool_t))) == NULL) {printf("malloc threadpool fail");break; // 跳出 do while}pool->min_thr_num = min_thr_num;pool->max_thr_num = max_thr_num;pool->busy_thr_num = 0;pool->live_thr_num = min_thr_num; // 活着的线程数初值=最小线程数pool->wait_exit_thr_num = 0;pool->queue_size = 0; // 有0个产品pool->queue_max_size = queue_max_size; // 最大任务队列数pool->queue_front = 0;pool->queue_rear = 0;pool->shutdown = false; // 不关闭线程池// 根据最大线程上限数,给工作线程数组开辟空间,并清零pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * max_thr_num);if (pool->threads == NULL) {printf("malloc threads fail");break;}memset(pool->threads, 0, sizeof(pthread_t) * max_thr_num); // 数组清零// 给任务队列开辟空间pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t) * queue_max_size);if (pool->task_queue == NULL) {printf("malloc task_queue fail");break;}// 初始化互斥琐、条件变量if (pthread_mutex_init(&(pool->lock), NULL) != 0|| pthread_mutex_init(&(pool->thread_counter), NULL) != 0|| pthread_cond_init(&(pool->queue_not_empty), NULL) != 0|| pthread_cond_init(&(pool->queue_not_full), NULL) != 0){printf("init the lock or cond fail");break;}// 启动 min_thr_num 个 work threadfor (i = 0; i < min_thr_num; i++) {pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool); // pool 指向当前线程池printf("start thread 0x%x...\n", (unsigned int)pool->threads[i]);}pthread_create(&(pool->adjust_tid), NULL, adjust_thread, (void *)pool); // 创建管理者线程return pool;} while(0);threadpool_free(pool); // 前面代码调用失败时,释放poll存储空间return NULL;
}// 向线程池中添加一个任务
// threadpool_add(thp, process, (void*)&num[i]); // 向线程池中添加任务 process:小写->大写
int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg)
{pthread_mutex_lock(&(pool->lock));// ==为真,队列已经满,调wait阻塞while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)) {pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));}if (pool->shutdown) {pthread_cond_broadcast(&(pool->queue_not_empty));pthread_mutex_unlock(&(pool->lock));return 0;}// 清空工作线程调用的回调函数的参数 argif (pool->task_queue[pool->queue_rear].arg != NULL) {pool->task_queue[pool->queue_rear].arg = NULL;}// 添加任务到任务队列里pool->task_queue[pool->queue_rear].function = function;pool->task_queue[pool->queue_rear].arg = arg;pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size; // 队尾指针移动,模拟环形pool->queue_size++;// 添加完任务后,队列不为空,唤醒线程池中等待处理任务的线程pthread_cond_signal(&(pool->queue_not_empty));pthread_mutex_unlock(&(pool->lock));return 0;
}// 线程池中各个工作线程
void *threadpool_thread(void *threadpool)
{threadpool_t *pool = (threadpool_t *)threadpool;threadpool_task_t task;while (true) {// Lock must be taken to wait on conditional variable// 刚创建出线程,等待任务队列里有任务,否则阻塞等待任务队列里有任务后再唤醒接收任务pthread_mutex_lock(&(pool->lock));// queue_ size == 0说明没有任务,调wait阻塞在条件变量上,若有任务,跳过该whilewhile ((pool->queue_size == 0) && (!pool->shutdown)) {printf("thread 0x%x is waiting\n", (unsigned int)pthread_self());pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));// 清除指定数目的空闲线程,如果要结束的线程个数大于 0 ,结束线程if (pool->wait_exit_thr_num > 0) {pool->wait_exit_thr_num--;// 如果线程池里线程个数大于最小值时可以结束当前线程if (pool->live_thr_num > pool->min_thr_num) {printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());pool->live_thr_num--; pthread_mutex_unlock(&(pool->lock));pthread_exit(NULL);}}}// 如果指定了 true,要关闭线程池里的每个线程,自行退出处理---销毁线程池if (pool->shutdown) {pthread_mutex_unlock(&(pool->lock));printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());pthread_detach(pthread_self());pthread_exit(NULL); // 线程自行结束}// 从任务队列里获取任务,是一个出队操作task.function = pool->task_queue[pool->queue_front].function;task.arg = pool->task_queue[pool->queue_front].arg; pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size; // 出队,模拟环形队列pool->queue_size--; // 通知可以有新的任务添加进来pthread_cond_broadcast(&(pool->queue_not_full));// 任务取出后,立即将线程池琐释放pthread_mutex_unlock(&(pool->lock));// 执行任务printf("thread 0x%x start working\n", (unsigned int)pthread_self());pthread_mutex_lock(&(pool->thread_counter)); // 忙状态线程数变量琐pool->busy_thr_num++; // 忙状态线程数+1pthread_mutex_unloqk(&(pool->thread_counter)); (*(task.function))(task.arg); // 执行回调函数任务// task.function(task.arg); // 执行回调函数任务// 任务结束处理printf("thread 0x%x end working\n", (unsigned int)pthread_self());pthread_mutex_lock(&(pool->thread_counter));pool->busy_thr_num--; // 处理掉一个任务,忙状态数线程数 -1pthread_mutex_unlock(&(pool->thread_counter));}pthread_exit(NULL);
}// 管理线程
void *adjust_thread(void *threadpool)
{int i;threadpool_t *pool = (threadpool_t *)threadpool;while (!pool->shutdown) {sleep(DEFAULT_TIME); // 定时对线程池管理pthread_mutex_lock(&(pool->lock));int queue_size = pool->queue_size; // 关注任务数int live_thr_num = pool->live_thr_num; // 存活线程数pthread_mutex_unlock(&(pool->lock));pthread_mutex_lock(&(pool->thread_counter));int busy_thr_num = pool->busy_thr_num; // 忙着的线程数pthread_mutex_unlock(&(pool->thread_counter));// 创建新线程算法: 任务数大于最小线程池个数,且存活的线程数少于最大线程个数时 如: 30>=10 && 40<100if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num) {pthread_mutex_lock(&(pool->lock));int add = 0;// 一次增加 DEFAULT_THREAD 个线程for (i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY&& pool->live_thr_num < pool->max_thr_num; i++) {if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])) {pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);}}pthread_mutex_unlock(&(pool->lock));}// 销毁多余的空闲线程 算法: 忙线程 X2 小于 存活的线程数 且 存活的线程数 大于 最小线程数时if((busy_thr_num * 2) < live_thr_num && live_thr_num > pool->min_thr_num) {// 一次销毁 DEFAULT_THREAD 个线程,随即 10 个即可pthread_mutex_lock(&(pool->lock));pool->wait_exit_thr_num = DEFAULT_THREAD_VARY; // 要销毁的线程数设置为 10pthread_mutex_unlock(&(pool->lock));for (i = 0; i < DEFAULT_THREAD_VARY; i++) {// 通知处在空闲状态的线程,他们会自行停止pthread_cond_signal(&(pool->queue_not_empty));}}}return NULL;
}int threadpool_destroy(threadpool_t *pool)
{int i;if (pool == NULL) {return -1;}pool->shutdown = true;// 先销毁管理线程pthread_join(pool->adjust_tid, NULL);for (i = 0; i < pool->live_thr_num; i++) {// 通知所有的空闲线程pthread_cond_broadcast(&(pool->queue_not_empty));}for (i = 0; i < pool->live_thr_num; i++) {pthread_join(pool->threads[i], NULL);}threadpool_free(pool);return 0;
}int threadpool_free(threadpool_t *pool)
{if(pool == NULL) {return -1;}if(pool->task_queue) {free(pool->task_queue);}if (pool->threads) {free(pool->threads);pthread_mutex_lock(&(pool->lock));pthread_mutex_destroy(&(pool->lock)); pthread_mutex_lock(&(pool->thread_counter));pthread_mutex_destroy(&(pool->thread_counter));pthread_cond_destroy(&(pool->queue_not_empty));pthread_cond_destroy(&(pool->queue_not_full));}free(pool);pool = NULL;return 0;
}int threadpool_all_threadnum(threadpool_t *pool)
{int all_threadnum = -1; // 总线程数pthread_mutex_lock(&(pool->lock));all_threadnum = pool->live_thr_num; // 存活线程数pthread_mutex_unlock(&(pool->lock));return all_threadnum;
}/*int threadpool_busy_threadnum(threadpool_t *pool)
{int busy_threadnum = -1; // 忙线程数pthread_mutex_lock(&(pool->thread_counter));busy_threadnum = pool->busy_thr_num;pthread_mutex_unlock(&(pool->thread_counter));
}*/// 线程池中的线程,模拟处理业务
void *process(void *arg)
{printf("thread 0x%x working on task %d\n", (unsigned int)pthread_self(), (int)arg);sleep(1);printf("task %d is end\n", (int)arg);return NULL;
}int main(void)
{// threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);threadpool_t *thp = threadpool_create(3, 100, 100); // 创建线程池,池里最小3个线程,最大100,队列最大100printf("pool inited");// int *num = (int*)malloc(sizeof(int) * 20);int num[20], i;for(i = 0; i < 20; i++) {num[i] = i;printf("add task %d\n", i);// int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg)threadpool_add(thp, process, (void*)&num[i]); /* 向线程池中添加任务*/}sleep(10); // 等子线程完成任务threadpool_destroy(thp);return 0;
}
Linux网络编程8——线程池模型相关推荐
- Linux网络编程---I/O复用模型之epoll
https://blog.csdn.net/men_wen/article/details/53456474 Linux网络编程-I/O复用模型之epoll 1. epoll模型简介 epoll是Li ...
- Linux网络编程---I/O复用模型之poll
https://blog.csdn.net/men_wen/article/details/53456474 Linux网络编程-I/O复用模型之poll 1.函数poll poll系统调用和sele ...
- Linux网络编程---I/O复用模型之select
https://blog.csdn.net/men_wen/article/details/53456435 Linux网络编程-I/O复用模型之select 1. IO复用模型 IO复用能够预先告知 ...
- 【Linux系统编程】线程池
00. 目录 文章目录 00. 目录 01. 线程池原理 02. 线程池应用实例 03. 线程池代码 04. 附录 01. 线程池原理 在传统服务器结构中,常是有一个总的监听线程监听有没有新的用户连接 ...
- [Linux网络编程]高并发-Select模型
概要介绍 一般情况下,处理socket通信一个线程或者进程在处理读写的时候要么阻塞在那一直等要么非阻塞然后过会查看是否可读可写,这样会浪费大量的资源,假如需要对多个套接字进行处理读写那么得开很多个线程 ...
- linux 网络7层模型,Linux网络编程——OSI七层模型、TCP/IP模型
OSI七层模型 开放式系统互连(Open System Interconnect),模型分为7层,从下往上依次为: 物理层: 数据链路层: 网络层: 传输层: 会话层: 表示层: 应用层: 记不住怎么 ...
- Linux网络编程7——epoll反应堆模型
学习视频链接 16-epoll反应堆main逻辑_bilibili_哔哩哔哩_bilibilihttps://www.bilibili.com/video/BV1iJ411S7UA?p=81& ...
- C++教程网之Linux网络编程视频 Unix网络编程视频
教程非常不错,价值280元,绝对是干货 Linux网络编程(总共41集) 讲解Linux网络编程知识,分以下四个篇章. Linux网络编程之TCP/IP基础篇 Linux网络编程之socket编程篇 ...
- linux网络编程(四)线程池
linux网络编程(四)线程池 为什么会有线程池? 实现简单的线程池 为什么会有线程池? 大多数的服务器可能都有这样一种情况,就是会在单位时间内接收到大量客户端请求,我们可以采取接受到客户端请求创建一 ...
最新文章
- C语言编译全过程剖析
- RPM方式安装MySQL5.6和windows下安装mysql解压版
- tomcat 增加运行内存
- MySQL 基础 ———— 分组查询
- pitr 原理_pgsql的备份和恢复
- 深度学习:batch_size和学习率 及如何调整
- 运筹学——线性规划及单纯形法求解
- CString和string的互相转换
- Linux局域网多人聊天软件
- EXE4J 错误提醒 Pleasedefine EXE4J_JAVA_HOME to point to an installes 64-bit JDK or JRE
- pcd格式点云的显示程序
- winform控件焦点设置
- Cocos Creator ts版本使用protobuf
- 2016 威斯康星 计算机科学,威斯康星麦迪逊大学计算机科学本科申请条件及案例分析...
- 动态修改窗口标题和类名
- java面向对象实验结论及心得_20162305 实验二 Java面向对象程序设计 实验报告
- java对银行卡号、手机号码、身份证号码进行脱敏
- fsck-磁盘修复工具
- The History Began from AlexNet: A Comprehensive Survey on Deep Learning Approaches
- JDBC (Java DB Connection)---Java数据库连接