多线程编程—线程池的实现
多线程编程—线程池的实现
执行与任务分离的组件— 线程池
https://github.com/wangbojing/threadpool
多线程技术主要解决了处理器单元内多个线程执行的问题,它可以显著的减少处理器单元的闲置时间,增加处理器单元的吞吐能力。线程池是多线程编程的一个必要组件,并且对于很多编程人员都是透明的,更是神秘的。有幸能为大家解析其中缘由,尚有不妥之处,欢迎大家抛砖。
线程池的概念,是一个用来管理一组执行任务线程的工具。既然是管理工具,那么该工具管理是用来管理任务与执行的。如图一线程池组件拓扑图,执行队列(Workers),任务队列(Jobs)和池管理(Pool Manager)三部分组成。
执行队列(Workers)是用来存放运行线程的队列。
任务队列(Jobs)是用来存放需要被执行的任务队列。
池管理(Pool Manager)主要是管理执行队列的执行顺序,执行任务的时间长短,对长时间没有使用的执行单元进行释放,执行单元满负荷运行的时及时添加执行单元;记录未执行的任务数量,对新任务入队,即将执行的任务出队等等。
图一 线程池组件拓扑图
执行队列(Workers)中的每一个执行单元(Worker)由哪些元素组成?线程ID,退出标志。
任务队列(Jobs)中的每一个任务(Jobs)的组成元素?执行每一个任务的具体执行函数,每一个任务的执行参数。
池管理(Pool Manager)由哪些元素组成?每一个新任务添加与执行时的移除用的互斥锁,每一个线程挂起的时所等待的条件变量。
根据分析如图二线程池的类图。
图二线程池的类图
到这里一个简单的线程池就已经可以呼之欲出了。以下为实现代码
/** Author: WangBoJing* email: 1989wangbojing@gmail.com * github: https://github.com/wangbojing*/
#include <pthread.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>#define LL_ADD(item, list) do { \item->prev = NULL; \item->next = list; \list = item; \
} while(0)#define LL_REMOVE(item, list) do { \if (item->prev != NULL) item->prev->next = item->next; \if (item->next != NULL) item->next->prev = item->prev; \if (list == item) list = item->next; \item->prev = item->next = NULL; \
} while(0)typedef void (*JOB_CALLBACK)(void *);
struct NTHREADPOOL;typedef struct NWORKER {pthread_t thread;int terminate;struct NTHREADPOOL *pool;struct NWORKER *next;struct NWORKER *prev;
} nWorker;typedef struct NJOB {JOB_CALLBACK job_func;void *arg;struct NJOB *next;struct NJOB *prev;
} nJob;typedef struct NTHREADPOOL {struct NWORKER *workers;struct NJOB *jobs;pthread_mutex_t jobs_mtx;pthread_cond_t jobs_cond;
} nThreadPool;void *ntyWorkerThread(void *arg) {nWorker *worker = (nWorker*)arg;while (1) {pthread_mutex_lock(&worker->pool->jobs_mtx);while (worker->pool->jobs == NULL) {if (worker->terminate) break;pthread_cond_wait(&worker->pool->jobs_cond, &worker->pool->jobs_mtx);}if (worker->terminate) {pthread_mutex_unlock(&worker->pool->jobs_mtx);break;}nJob *job = worker->pool->jobs;if (job != NULL) {LL_REMOVE(job, worker->pool->jobs);}pthread_mutex_unlock(&worker->pool->jobs_mtx);if (job == NULL) continue;job->job_func(job);usleep(1);}free(worker);pthread_exit(NULL);}int ntyThreadPoolCreate(nThreadPool *pool, int numWorkers) {if (pool == NULL) return 1;if (numWorkers < 1) numWorkers = 1;memset(pool, 0, sizeof(nThreadPool));pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;memcpy(&pool->jobs_cond, &blank_cond, sizeof(pool->jobs_cond));pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;memcpy(&pool->jobs_mtx, &blank_mutex, sizeof(pool->jobs_mtx));int i = 0;for (i = 0;i < numWorkers;i ++) {nWorker *worker = (nWorker*)malloc(sizeof(nWorker));if (worker == NULL) {perror("malloc");return 1;}memset(worker, 0, sizeof(nWorker));worker->pool = pool;int ret = pthread_create(&worker->thread, NULL, ntyWorkerThread, (void*)worker);if (ret) {perror("pthread_create");free(worker);return 1;}LL_ADD(worker, worker->pool->workers);}
}void ntyThreadPoolShutdown(nThreadPool *pool) {nWorker *worker = NULL;for (worker = pool->workers;worker != NULL;worker = worker->next) {worker->terminate = 1;}pthread_mutex_lock(&pool->jobs_mtx);pool->workers = NULL;pool->jobs = NULL;pthread_cond_broadcast(&pool->jobs_cond);pthread_mutex_unlock(&pool->jobs_mtx);}void ntyThreadPoolPush(nThreadPool *pool, nJob *job) {pthread_mutex_lock(&pool->jobs_mtx);LL_ADD(job, pool->jobs);pthread_cond_signal(&pool->jobs_cond);pthread_mutex_unlock(&pool->jobs_mtx);}/********************************* debug thread pool *********************************/#define KING_MAX_THREADS 80
#define KING_COUNTER_SIZE 1000void king_counter(void *arg) {nJob *job = (nJob*)arg;int index = *(int *)job->arg;printf("index: %d, selfid:%lu\n", index, pthread_self());free(job->arg);free(job);
}int main(int argc, char *argv[]) {nThreadPool pool;ntyThreadPoolCreate(&pool, KING_MAX_THREADS);int i = 0;for (i = 0;i < KING_COUNTER_SIZE;i ++) {nJob *job = (nJob*)malloc(sizeof(nJob));if (job == NULL) {perror("malloc");exit(1);}job->job_func = king_counter;job->arg = malloc(sizeof(int));*(int*)job->arg = i;ntyThreadPoolPush(&pool, job);}getchar();printf("You are very good !!!!\n");}
这样的线程池还是只是一个Demo,原因有如下几点需要我们值得改进的。
线程池的线程数量是确定的,不能随着系统任务请求数量放缩线程池的大小。
任务数量的统计,并没有对任务队列进行统计
执行任务中的线程数量,等待执行的任务数量进行统计
每一个执行任务的时间没有做限制,
IO密集型与计算密集型区分,线程池非常常用,但是根据不同的业务场景需要设置不同配置
在用户任务执行函数里,用户主动的调用了pthread_exit退出线程的保护机制
针对于以上几点问题,改进了一版线程池
/** Author: WangBoJing* email: 1989wangbojing@gmail.com * github: https://github.com/wangbojing*/#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <signal.h>
#include <errno.h>
#include <time.h>
#include <unistd.h>
#include <pthread.h>typedef void (*JOB_CALLBACK)(void *);typedef struct NJOB {struct NJOB *next;JOB_CALLBACK func;void *arg;
} nJob;typedef struct NWORKER {struct NWORKER *active_next;pthread_t active_tid;
} nWorker;typedef struct NTHREADPOOL {struct NTHREADPOOL *forw;struct NTHREADPOOL *back;pthread_mutex_t mtx;pthread_cond_t busycv;pthread_cond_t workcv;pthread_cond_t waitcv;nWorker *active;nJob *head;nJob *tail;pthread_attr_t attr;int flags;unsigned int linger;int minimum;int maximum;int nthreads;int idle;} nThreadPool;static void* ntyWorkerThread(void *arg);
#define NTY_POOL_WAIT 0x01
#define NTY_POOL_DESTROY 0x02static pthread_mutex_t nty_pool_lock = PTHREAD_MUTEX_INITIALIZER;
static sigset_t fillset;
nThreadPool *thread_pool = NULL;static int ntyWorkerCreate(nThreadPool *pool) {sigset_t oset;pthread_t thread_id;pthread_sigmask(SIG_SETMASK, &fillset, &oset);int error = pthread_create(&thread_id, &pool->attr, ntyWorkerThread, pool);pthread_sigmask(SIG_SETMASK, &oset, NULL);return error;
}static void ntyWorkerCleanup(nThreadPool * pool) {--pool->nthreads;if (pool->flags & NTY_POOL_DESTROY) {if (pool->nthreads == 0) {pthread_cond_broadcast(&pool->busycv);}} else if (pool->head != NULL && pool->nthreads < pool->maximum && ntyWorkerCreate(pool) == 0) {pool->nthreads ++;}pthread_mutex_unlock(&pool->mtx);}static void ntyNotifyWaiters(nThreadPool *pool) {if (pool->head == NULL && pool->active == NULL) {pool->flags &= ~NTY_POOL_WAIT;pthread_cond_broadcast(&pool->waitcv);}}static void ntyJobCleanup(nThreadPool *pool) {pthread_t tid = pthread_self();nWorker *activep;nWorker **activepp;pthread_mutex_lock(&pool->mtx);for (activepp = &pool->active;(activep = *activepp) != NULL;activepp = &activep->active_next) {*activepp = activep->active_next;break;}if (pool->flags & NTY_POOL_WAIT) ntyNotifyWaiters(pool);}static void* ntyWorkerThread(void *arg) {nThreadPool *pool = (nThreadPool*)arg;nWorker active;int timeout;struct timespec ts;JOB_CALLBACK func;pthread_mutex_lock(&pool->mtx);pthread_cleanup_push(ntyWorkerCleanup, pool);active.active_tid = pthread_self();while (1) {pthread_sigmask(SIG_SETMASK, &fillset, NULL);pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);timeout = 0;pool->idle ++;if (pool->flags & NTY_POOL_WAIT) {ntyNotifyWaiters(pool);}while (pool->head == NULL && !(pool->flags & NTY_POOL_DESTROY)) {if (pool->nthreads <= pool->minimum) {pthread_cond_wait(&pool->workcv, &pool->mtx);} else {clock_gettime(CLOCK_REALTIME, &ts);ts.tv_sec += pool->linger;if (pool->linger == 0 || pthread_cond_timedwait(&pool->workcv, &pool->mtx, &ts) == ETIMEDOUT) {timeout = 1;break;}}}pool->idle --;if (pool->flags & NTY_POOL_DESTROY) break;nJob *job = pool->head; if (job != NULL) {timeout = 0;func = job->func;void *job_arg = job->arg;pool->head = job->next;if (job == pool->tail) {pool->tail == NULL;}active.active_next = pool->active;pool->active = &active;pthread_mutex_unlock(&pool->mtx);pthread_cleanup_push(ntyJobCleanup, pool);free(job);func(job_arg);pthread_cleanup_pop(1);}if (timeout && (pool->nthreads > pool->minimum)) {break;}}pthread_cleanup_pop(1);return NULL;}static void ntyCloneAttributes(pthread_attr_t *new_attr, pthread_attr_t *old_attr) {struct sched_param param;void *addr;size_t size;int value;pthread_attr_init(new_attr);if (old_attr != NULL) {pthread_attr_getstack(old_attr, &addr, &size);pthread_attr_setstack(new_attr, NULL, size);pthread_attr_getscope(old_attr, &value);pthread_attr_setscope(new_attr, value);pthread_attr_getinheritsched(old_attr, &value);pthread_attr_setinheritsched(new_attr, value);pthread_attr_getschedpolicy(old_attr, &value);pthread_attr_setschedpolicy(new_attr, value);pthread_attr_getschedparam(old_attr, ¶m);pthread_attr_setschedparam(new_attr, ¶m);pthread_attr_getguardsize(old_attr, &size);pthread_attr_setguardsize(new_attr, size);}pthread_attr_setdetachstate(new_attr, PTHREAD_CREATE_DETACHED);}nThreadPool *ntyThreadPoolCreate(int min_threads, int max_threads, int linger, pthread_attr_t *attr) {sigfillset(&fillset);if (min_threads > max_threads || max_threads < 1) {errno = EINVAL;return NULL;}nThreadPool *pool = (nThreadPool*)malloc(sizeof(nThreadPool));if (pool == NULL) {errno = ENOMEM;return NULL;}pthread_mutex_init(&pool->mtx, NULL);pthread_cond_init(&pool->busycv, NULL);pthread_cond_init(&pool->workcv, NULL);pthread_cond_init(&pool->waitcv, NULL);pool->active = NULL;pool->head = NULL;pool->tail = NULL;pool->flags = 0;pool->linger = linger;pool->minimum = min_threads;pool->maximum = max_threads;pool->nthreads = 0;pool->idle = 0;ntyCloneAttributes(&pool->attr, attr);pthread_mutex_lock(&nty_pool_lock);if (thread_pool == NULL) {pool->forw = pool;pool->back = pool;thread_pool = pool;} else {thread_pool->back->forw = pool;pool->forw = thread_pool;pool->back = pool->back;thread_pool->back = pool;}pthread_mutex_unlock(&nty_pool_lock);return pool;}int ntyThreadPoolQueue(nThreadPool *pool, JOB_CALLBACK func, void *arg) {nJob *job = (nJob*)malloc(sizeof(nJob));if (job == NULL) {errno = ENOMEM;return -1;}job->next = NULL;job->func = func;job->arg = arg;pthread_mutex_lock(&pool->mtx);if (pool->head == NULL) {pool->head = job;} else {pool->tail->next = job;}pool->tail = job;if (pool->idle > 0) {pthread_cond_signal(&pool->workcv);} else if (pool->nthreads < pool->maximum && ntyWorkerCreate(pool) == 0) {pool->nthreads ++;}pthread_mutex_unlock(&pool->mtx);
}void nThreadPoolWait(nThreadPool *pool) {pthread_mutex_lock(&pool->mtx);pthread_cleanup_push(pthread_mutex_unlock, &pool->mtx);while (pool->head != NULL || pool->active != NULL) {pool->flags |= NTY_POOL_WAIT;pthread_cond_wait(&pool->waitcv, &pool->mtx);}pthread_cleanup_pop(1);
}void nThreadPoolDestroy(nThreadPool *pool) {nWorker *activep;nJob *job;pthread_mutex_lock(&pool->mtx);pthread_cleanup_push(pthread_mutex_unlock, &pool->mtx);pool->flags |= NTY_POOL_DESTROY;pthread_cond_broadcast(&pool->workcv);for (activep = pool->active;activep != NULL;activep = activep->active_next) {pthread_cancel(activep->active_tid);}while (pool->nthreads != 0) {pthread_cond_wait(&pool->busycv, &pool->mtx);}pthread_cleanup_pop(1);pthread_mutex_lock(&nty_pool_lock);if (thread_pool == pool) {thread_pool = pool->forw;}if (thread_pool == pool) {thread_pool = NULL;} else {pool->back->forw = pool->forw;pool->forw->back = pool->back;}pthread_mutex_unlock(&nty_pool_lock);for (job = pool->head;job != NULL;job = pool->head) {pool->head = job->next;free(job);}pthread_attr_destroy(&pool->attr);free(pool);}/********************************* debug thread pool *********************************/void king_counter(void *arg) {int index = *(int*)arg;printf("index : %d, selfid : %lu\n", index, pthread_self());free(arg);usleep(1);
}#define KING_COUNTER_SIZE 1000int main(int argc, char *argv[]) {nThreadPool *pool = ntyThreadPoolCreate(10, 20, 15, NULL);int i = 0;for (i = 0;i < KING_COUNTER_SIZE;i ++) {int *index = (int*)malloc(sizeof(int));memset(index, 0, sizeof(int));memcpy(index, &i, sizeof(int));ntyThreadPoolQueue(pool, king_counter, index);}getchar();printf("You are very good !!!!\n");
}
转载于:https://blog.51cto.com/wangbojing/1968999
多线程编程—线程池的实现相关推荐
- 多线程之线程池-各个参数的含义- 阿里,美团,京东面试题目
阿里的面试官问了个问题,如果corepollSize=10,MaxPollSize=20,如果来了25个线程 怎么办, 答案: 当一个任务通过execute(Runnable)方法欲添加到线程池时: ...
- C++多线程以及线程池
1 线程 1.1 简介 线程(英语:thread)是操作系统能够进行运算调度的最小单位.大部分情况下,它被包含在进程之中,是进程中的实际运作单位.一条线程指的是进程中一个单一顺序的控制流,一个进程 ...
- Java 并发编程 -- 线程池源码实战
一.概述 小编在网上看了好多的关于线程池原理.源码分析相关的文章,但是说实话,没有一篇让我觉得读完之后豁然开朗,完完全全的明白线程池,要么写的太简单,只写了一点皮毛,要么就是是晦涩难懂,看完之后几乎都 ...
- Java的多线程和线程池的使用,你真的清楚了吗?
Java的多线程和线程池的使用 多线程大大提高程序运行效率,我们在开发过程中经常会开启一个线程来执行一些费时的任务.开启一个线程有4种方式,在下面的文章我将详细的去讲解. 继承Thread 继承Thr ...
- Java多线程之线程池配置合理线程数
Java多线程之线程池配置合理线程数 目录 代码查看公司服务器或阿里云是几核的 合理线程数配置之CPU密集型 合理线程数配置之IO密集型 1. 代码查看公司服务器或阿里云是几核的 要合理配置线程数首先 ...
- Java多线程之线程池的手写改造和拒绝策略
Java多线程之线程池的手写改造和拒绝策略 目录 自定义线程池的使用 四种拒绝策略代码体现 1. 自定义线程池的使用 自定义线程池(拒绝策略默认AbortPolicy) public class My ...
- Java多线程之线程池7大参数、底层工作原理、拒绝策略详解
Java多线程之线程池7大参数详解 目录 企业面试题 线程池7大参数源码 线程池7大参数详解 底层工作原理详解 线程池的4种拒绝策略理论简介 面试的坑:线程池实际中使用哪一个? 1. 企业面试题 蚂蚁 ...
- Java多线程之线程池详解
Java多线程之线程池详解 目录: 线程池使用及优势 线程池3个常用方式 线程池7大参数深入介绍 线程池底层工作原理 1. 线程池使用及优势 线程池做的工作主要是控制运行的线程的数量,处理过程中将任务 ...
- pool python 传参数_Python-爬虫-多线程、线程池模拟(urllib、requests、UserAgent、超时等)...
接着之前的MonkeyLei:Python-爬取页面内容(涉及urllib.requests.UserAgent.Json等) 继续练习下多线程,线程池模拟.. 我想这样: 1. 创建一个线程池,线程 ...
最新文章
- vue 组件属性监听_Vue.js 监听属性
- Django(part36)--cookies
- [html] 实现一个居中半透明的模态窗
- c语言中实现自动平移,c语言实现图像的旋转与平移
- 应对DDOS***需要“多管齐下”
- 剑指offer所有的题目总结(转)
- 周末巨献:100+诡异的数据集,20万Eclipse Bug、死囚遗言
- java成员方法tostring_Java 工具类-toString
- Java代码规范之编程规约
- 小米笔记本重装win10系统教程
- 实时网速怎么看快慢_iQOO怎么显示网速 网络状态实时查看
- java并发编程(三)--java中的锁(Lock接口和队列同步器AQS)
- 智能汽车域控制器的认识
- 四、模拟英语四六级答题卡识别阅卷评分
- 二次函数回归方程_高三||【高三专题】三角函数提优专题卷
- re python 引擎_python 详解re模块
- Springboot中Bean的具体含义
- CAJ如何转化为PDF文件
- 基于JAVA口红专卖网站计算机毕业设计源码+数据库+lw文档+系统+部署
- 计算机专业近几年的参考文献,近几年计算机专业发参考文献 计算机专业发专著类参考文献哪里找...