多线程编程—线程池的实现

执行与任务分离的组件— 线程池

https://github.com/wangbojing/threadpool

多线程技术主要解决了处理器单元内多个线程执行的问题,它可以显著的减少处理器单元的闲置时间,增加处理器单元的吞吐能力。线程池是多线程编程的一个必要组件,并且对于很多编程人员都是透明的,更是神秘的。有幸能为大家解析其中缘由,尚有不妥之处,欢迎大家抛砖。

线程池的概念,是一个用来管理一组执行任务线程的工具。既然是管理工具,那么该工具管理是用来管理任务与执行的。如图一线程池组件拓扑图,执行队列(Workers),任务队列(Jobs)和池管理(Pool Manager)三部分组成。

执行队列(Workers)是用来存放运行线程的队列。

任务队列(Jobs)是用来存放需要被执行的任务队列。

池管理(Pool Manager)主要是管理执行队列的执行顺序,执行任务的时间长短,对长时间没有使用的执行单元进行释放,执行单元满负荷运行的时及时添加执行单元;记录未执行的任务数量,对新任务入队,即将执行的任务出队等等。

图一 线程池组件拓扑图

执行队列(Workers)中的每一个执行单元(Worker)由哪些元素组成?线程ID,退出标志。

任务队列(Jobs)中的每一个任务(Jobs)的组成元素?执行每一个任务的具体执行函数,每一个任务的执行参数。

池管理(Pool Manager)由哪些元素组成?每一个新任务添加与执行时的移除用的互斥锁,每一个线程挂起的时所等待的条件变量。

根据分析如图二线程池的类图。

图二线程池的类图

到这里一个简单的线程池就已经可以呼之欲出了。以下为实现代码

/** Author: WangBoJing* email: 1989wangbojing@gmail.com * github: https://github.com/wangbojing*/
#include <pthread.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>#define LL_ADD(item, list) do { \item->prev = NULL;   \item->next = list;   \list = item;    \
} while(0)#define LL_REMOVE(item, list) do {        \if (item->prev != NULL) item->prev->next = item->next; \if (item->next != NULL) item->next->prev = item->prev; \if (list == item) list = item->next;     \item->prev = item->next = NULL;       \
} while(0)typedef void (*JOB_CALLBACK)(void *);
struct NTHREADPOOL;typedef struct NWORKER {pthread_t thread;int terminate;struct NTHREADPOOL *pool;struct NWORKER *next;struct NWORKER *prev;
} nWorker;typedef struct NJOB {JOB_CALLBACK job_func;void *arg;struct NJOB *next;struct NJOB *prev;
} nJob;typedef struct NTHREADPOOL {struct NWORKER *workers;struct NJOB *jobs;pthread_mutex_t jobs_mtx;pthread_cond_t jobs_cond;
} nThreadPool;void *ntyWorkerThread(void *arg) {nWorker *worker = (nWorker*)arg;while (1) {pthread_mutex_lock(&worker->pool->jobs_mtx);while (worker->pool->jobs == NULL) {if (worker->terminate) break;pthread_cond_wait(&worker->pool->jobs_cond, &worker->pool->jobs_mtx);}if (worker->terminate) {pthread_mutex_unlock(&worker->pool->jobs_mtx);break;}nJob *job = worker->pool->jobs;if (job != NULL) {LL_REMOVE(job, worker->pool->jobs);}pthread_mutex_unlock(&worker->pool->jobs_mtx);if (job == NULL) continue;job->job_func(job);usleep(1);}free(worker);pthread_exit(NULL);}int ntyThreadPoolCreate(nThreadPool *pool, int numWorkers) {if (pool == NULL) return 1;if (numWorkers < 1) numWorkers = 1;memset(pool, 0, sizeof(nThreadPool));pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;memcpy(&pool->jobs_cond, &blank_cond, sizeof(pool->jobs_cond));pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;memcpy(&pool->jobs_mtx, &blank_mutex, sizeof(pool->jobs_mtx));int i = 0;for (i = 0;i < numWorkers;i ++) {nWorker *worker = (nWorker*)malloc(sizeof(nWorker));if (worker == NULL) {perror("malloc");return 1;}memset(worker, 0, sizeof(nWorker));worker->pool = pool;int ret = pthread_create(&worker->thread, NULL, ntyWorkerThread, (void*)worker);if (ret) {perror("pthread_create");free(worker);return 1;}LL_ADD(worker, worker->pool->workers);}
}void ntyThreadPoolShutdown(nThreadPool *pool) {nWorker *worker = NULL;for (worker = pool->workers;worker != NULL;worker = worker->next) {worker->terminate = 1;}pthread_mutex_lock(&pool->jobs_mtx);pool->workers = NULL;pool->jobs = NULL;pthread_cond_broadcast(&pool->jobs_cond);pthread_mutex_unlock(&pool->jobs_mtx);}void ntyThreadPoolPush(nThreadPool *pool, nJob *job) {pthread_mutex_lock(&pool->jobs_mtx);LL_ADD(job, pool->jobs);pthread_cond_signal(&pool->jobs_cond);pthread_mutex_unlock(&pool->jobs_mtx);}/********************************* debug thread pool *********************************/#define KING_MAX_THREADS  80
#define KING_COUNTER_SIZE 1000void king_counter(void *arg) {nJob *job = (nJob*)arg;int index = *(int *)job->arg;printf("index: %d, selfid:%lu\n", index, pthread_self());free(job->arg);free(job);
}int main(int argc, char *argv[]) {nThreadPool pool;ntyThreadPoolCreate(&pool, KING_MAX_THREADS);int i = 0;for (i = 0;i < KING_COUNTER_SIZE;i ++) {nJob *job = (nJob*)malloc(sizeof(nJob));if (job == NULL) {perror("malloc");exit(1);}job->job_func = king_counter;job->arg = malloc(sizeof(int));*(int*)job->arg = i;ntyThreadPoolPush(&pool, job);}getchar();printf("You are very good !!!!\n");}

这样的线程池还是只是一个Demo,原因有如下几点需要我们值得改进的。

  1. 线程池的线程数量是确定的,不能随着系统任务请求数量放缩线程池的大小。

  2. 任务数量的统计,并没有对任务队列进行统计

  3. 执行任务中的线程数量,等待执行的任务数量进行统计

  4. 每一个执行任务的时间没有做限制,

  5. IO密集型与计算密集型区分,线程池非常常用,但是根据不同的业务场景需要设置不同配置

  6. 在用户任务执行函数里,用户主动的调用了pthread_exit退出线程的保护机制

针对于以上几点问题,改进了一版线程池

/** Author: WangBoJing* email: 1989wangbojing@gmail.com * github: https://github.com/wangbojing*/#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <signal.h>
#include <errno.h>
#include <time.h>
#include <unistd.h>
#include <pthread.h>typedef void (*JOB_CALLBACK)(void *);typedef struct NJOB {struct NJOB *next;JOB_CALLBACK func;void *arg;
} nJob;typedef struct NWORKER {struct NWORKER *active_next;pthread_t active_tid;
} nWorker;typedef struct NTHREADPOOL {struct NTHREADPOOL *forw;struct NTHREADPOOL *back;pthread_mutex_t mtx;pthread_cond_t busycv;pthread_cond_t workcv;pthread_cond_t waitcv;nWorker *active;nJob *head;nJob *tail;pthread_attr_t attr;int flags;unsigned int linger;int minimum;int maximum;int nthreads;int idle;} nThreadPool;static void* ntyWorkerThread(void *arg);
#define NTY_POOL_WAIT   0x01
#define NTY_POOL_DESTROY  0x02static pthread_mutex_t nty_pool_lock = PTHREAD_MUTEX_INITIALIZER;
static sigset_t fillset;
nThreadPool *thread_pool = NULL;static int ntyWorkerCreate(nThreadPool *pool) {sigset_t oset;pthread_t thread_id;pthread_sigmask(SIG_SETMASK, &fillset, &oset);int error = pthread_create(&thread_id, &pool->attr, ntyWorkerThread, pool);pthread_sigmask(SIG_SETMASK, &oset, NULL);return error;
}static void ntyWorkerCleanup(nThreadPool * pool) {--pool->nthreads;if (pool->flags & NTY_POOL_DESTROY) {if (pool->nthreads == 0) {pthread_cond_broadcast(&pool->busycv);}} else if (pool->head != NULL && pool->nthreads < pool->maximum && ntyWorkerCreate(pool) == 0) {pool->nthreads ++;}pthread_mutex_unlock(&pool->mtx);}static void ntyNotifyWaiters(nThreadPool *pool) {if (pool->head == NULL && pool->active == NULL) {pool->flags &= ~NTY_POOL_WAIT;pthread_cond_broadcast(&pool->waitcv);}}static void ntyJobCleanup(nThreadPool *pool) {pthread_t tid = pthread_self();nWorker *activep;nWorker **activepp;pthread_mutex_lock(&pool->mtx);for (activepp = &pool->active;(activep = *activepp) != NULL;activepp = &activep->active_next) {*activepp = activep->active_next;break;}if (pool->flags & NTY_POOL_WAIT) ntyNotifyWaiters(pool);}static void* ntyWorkerThread(void *arg) {nThreadPool *pool = (nThreadPool*)arg;nWorker active;int timeout;struct timespec ts;JOB_CALLBACK func;pthread_mutex_lock(&pool->mtx);pthread_cleanup_push(ntyWorkerCleanup, pool);active.active_tid = pthread_self();while (1) {pthread_sigmask(SIG_SETMASK, &fillset, NULL);pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);timeout = 0;pool->idle ++;if (pool->flags & NTY_POOL_WAIT) {ntyNotifyWaiters(pool);}while (pool->head == NULL && !(pool->flags & NTY_POOL_DESTROY)) {if (pool->nthreads <= pool->minimum) {pthread_cond_wait(&pool->workcv, &pool->mtx);} else {clock_gettime(CLOCK_REALTIME, &ts);ts.tv_sec += pool->linger;if (pool->linger == 0 || pthread_cond_timedwait(&pool->workcv, &pool->mtx, &ts) == ETIMEDOUT) {timeout = 1;break;}}}pool->idle --;if (pool->flags & NTY_POOL_DESTROY) break;nJob *job = pool->head;  if (job != NULL) {timeout = 0;func = job->func;void *job_arg = job->arg;pool->head = job->next;if (job == pool->tail) {pool->tail == NULL;}active.active_next = pool->active;pool->active = &active;pthread_mutex_unlock(&pool->mtx);pthread_cleanup_push(ntyJobCleanup, pool);free(job);func(job_arg);pthread_cleanup_pop(1);}if (timeout && (pool->nthreads > pool->minimum)) {break;}}pthread_cleanup_pop(1);return NULL;}static void ntyCloneAttributes(pthread_attr_t *new_attr, pthread_attr_t *old_attr) {struct sched_param param;void *addr;size_t size;int value;pthread_attr_init(new_attr);if (old_attr != NULL) {pthread_attr_getstack(old_attr, &addr, &size);pthread_attr_setstack(new_attr, NULL, size);pthread_attr_getscope(old_attr, &value);pthread_attr_setscope(new_attr, value);pthread_attr_getinheritsched(old_attr, &value);pthread_attr_setinheritsched(new_attr, value);pthread_attr_getschedpolicy(old_attr, &value);pthread_attr_setschedpolicy(new_attr, value);pthread_attr_getschedparam(old_attr, &param);pthread_attr_setschedparam(new_attr, &param);pthread_attr_getguardsize(old_attr, &size);pthread_attr_setguardsize(new_attr, size);}pthread_attr_setdetachstate(new_attr, PTHREAD_CREATE_DETACHED);}nThreadPool *ntyThreadPoolCreate(int min_threads, int max_threads, int linger, pthread_attr_t *attr) {sigfillset(&fillset);if (min_threads > max_threads || max_threads < 1) {errno = EINVAL;return NULL;}nThreadPool *pool = (nThreadPool*)malloc(sizeof(nThreadPool));if (pool == NULL) {errno = ENOMEM;return NULL;}pthread_mutex_init(&pool->mtx, NULL);pthread_cond_init(&pool->busycv, NULL);pthread_cond_init(&pool->workcv, NULL);pthread_cond_init(&pool->waitcv, NULL);pool->active = NULL;pool->head = NULL;pool->tail = NULL;pool->flags = 0;pool->linger = linger;pool->minimum = min_threads;pool->maximum = max_threads;pool->nthreads = 0;pool->idle = 0;ntyCloneAttributes(&pool->attr, attr);pthread_mutex_lock(&nty_pool_lock);if (thread_pool == NULL) {pool->forw = pool;pool->back = pool;thread_pool = pool;} else {thread_pool->back->forw = pool;pool->forw = thread_pool;pool->back = pool->back;thread_pool->back = pool;}pthread_mutex_unlock(&nty_pool_lock);return pool;}int ntyThreadPoolQueue(nThreadPool *pool, JOB_CALLBACK func, void *arg) {nJob *job = (nJob*)malloc(sizeof(nJob));if (job == NULL) {errno = ENOMEM;return -1;}job->next = NULL;job->func = func;job->arg = arg;pthread_mutex_lock(&pool->mtx);if (pool->head == NULL) {pool->head = job;} else {pool->tail->next = job;}pool->tail = job;if (pool->idle > 0) {pthread_cond_signal(&pool->workcv);} else if (pool->nthreads < pool->maximum && ntyWorkerCreate(pool) == 0) {pool->nthreads ++;}pthread_mutex_unlock(&pool->mtx);
}void nThreadPoolWait(nThreadPool *pool) {pthread_mutex_lock(&pool->mtx);pthread_cleanup_push(pthread_mutex_unlock, &pool->mtx);while (pool->head != NULL || pool->active != NULL) {pool->flags |= NTY_POOL_WAIT;pthread_cond_wait(&pool->waitcv, &pool->mtx);}pthread_cleanup_pop(1);
}void nThreadPoolDestroy(nThreadPool *pool) {nWorker *activep;nJob *job;pthread_mutex_lock(&pool->mtx);pthread_cleanup_push(pthread_mutex_unlock, &pool->mtx);pool->flags |= NTY_POOL_DESTROY;pthread_cond_broadcast(&pool->workcv);for (activep = pool->active;activep != NULL;activep = activep->active_next) {pthread_cancel(activep->active_tid);}while (pool->nthreads != 0) {pthread_cond_wait(&pool->busycv, &pool->mtx);}pthread_cleanup_pop(1);pthread_mutex_lock(&nty_pool_lock);if (thread_pool == pool) {thread_pool = pool->forw;}if (thread_pool == pool) {thread_pool = NULL;} else {pool->back->forw = pool->forw;pool->forw->back = pool->back;}pthread_mutex_unlock(&nty_pool_lock);for (job = pool->head;job != NULL;job = pool->head) {pool->head = job->next;free(job);}pthread_attr_destroy(&pool->attr);free(pool);}/********************************* debug thread pool *********************************/void king_counter(void *arg) {int index = *(int*)arg;printf("index : %d, selfid : %lu\n", index, pthread_self());free(arg);usleep(1);
}#define KING_COUNTER_SIZE 1000int main(int argc, char *argv[]) {nThreadPool *pool = ntyThreadPoolCreate(10, 20, 15, NULL);int i = 0;for (i = 0;i < KING_COUNTER_SIZE;i ++) {int *index = (int*)malloc(sizeof(int));memset(index, 0, sizeof(int));memcpy(index, &i, sizeof(int));ntyThreadPoolQueue(pool, king_counter, index);}getchar();printf("You are very good !!!!\n");
}

转载于:https://blog.51cto.com/wangbojing/1968999

多线程编程—线程池的实现相关推荐

  1. 多线程之线程池-各个参数的含义- 阿里,美团,京东面试题目

    阿里的面试官问了个问题,如果corepollSize=10,MaxPollSize=20,如果来了25个线程 怎么办, 答案: 当一个任务通过execute(Runnable)方法欲添加到线程池时: ...

  2. C++多线程以及线程池

    1 线程 1.1 简介   线程(英语:thread)是操作系统能够进行运算调度的最小单位.大部分情况下,它被包含在进程之中,是进程中的实际运作单位.一条线程指的是进程中一个单一顺序的控制流,一个进程 ...

  3. Java 并发编程 -- 线程池源码实战

    一.概述 小编在网上看了好多的关于线程池原理.源码分析相关的文章,但是说实话,没有一篇让我觉得读完之后豁然开朗,完完全全的明白线程池,要么写的太简单,只写了一点皮毛,要么就是是晦涩难懂,看完之后几乎都 ...

  4. Java的多线程和线程池的使用,你真的清楚了吗?

    Java的多线程和线程池的使用 多线程大大提高程序运行效率,我们在开发过程中经常会开启一个线程来执行一些费时的任务.开启一个线程有4种方式,在下面的文章我将详细的去讲解. 继承Thread 继承Thr ...

  5. Java多线程之线程池配置合理线程数

    Java多线程之线程池配置合理线程数 目录 代码查看公司服务器或阿里云是几核的 合理线程数配置之CPU密集型 合理线程数配置之IO密集型 1. 代码查看公司服务器或阿里云是几核的 要合理配置线程数首先 ...

  6. Java多线程之线程池的手写改造和拒绝策略

    Java多线程之线程池的手写改造和拒绝策略 目录 自定义线程池的使用 四种拒绝策略代码体现 1. 自定义线程池的使用 自定义线程池(拒绝策略默认AbortPolicy) public class My ...

  7. Java多线程之线程池7大参数、底层工作原理、拒绝策略详解

    Java多线程之线程池7大参数详解 目录 企业面试题 线程池7大参数源码 线程池7大参数详解 底层工作原理详解 线程池的4种拒绝策略理论简介 面试的坑:线程池实际中使用哪一个? 1. 企业面试题 蚂蚁 ...

  8. Java多线程之线程池详解

    Java多线程之线程池详解 目录: 线程池使用及优势 线程池3个常用方式 线程池7大参数深入介绍 线程池底层工作原理 1. 线程池使用及优势 线程池做的工作主要是控制运行的线程的数量,处理过程中将任务 ...

  9. pool python 传参数_Python-爬虫-多线程、线程池模拟(urllib、requests、UserAgent、超时等)...

    接着之前的MonkeyLei:Python-爬取页面内容(涉及urllib.requests.UserAgent.Json等) 继续练习下多线程,线程池模拟.. 我想这样: 1. 创建一个线程池,线程 ...

最新文章

  1. vue 组件属性监听_Vue.js 监听属性
  2. Django(part36)--cookies
  3. [html] 实现一个居中半透明的模态窗
  4. c语言中实现自动平移,c语言实现图像的旋转与平移
  5. 应对DDOS***需要“多管齐下”
  6. 剑指offer所有的题目总结(转)
  7. 周末巨献:100+诡异的数据集,20万Eclipse Bug、死囚遗言
  8. java成员方法tostring_Java 工具类-toString
  9. Java代码规范之编程规约
  10. 小米笔记本重装win10系统教程
  11. 实时网速怎么看快慢_iQOO怎么显示网速 网络状态实时查看
  12. java并发编程(三)--java中的锁(Lock接口和队列同步器AQS)
  13. 智能汽车域控制器的认识
  14. 四、模拟英语四六级答题卡识别阅卷评分
  15. 二次函数回归方程_高三||【高三专题】三角函数提优专题卷
  16. re python 引擎_python 详解re模块
  17. Springboot中Bean的具体含义
  18. CAJ如何转化为PDF文件
  19. 基于JAVA口红专卖网站计算机毕业设计源码+数据库+lw文档+系统+部署
  20. 计算机专业近几年的参考文献,近几年计算机专业发参考文献 计算机专业发专著类参考文献哪里找...

热门文章

  1. Java多线程学习笔记-线程的使用
  2. 51单片机学习笔记(清翔版)(13)——LED点阵、74HC595
  3. nginx入门(4):FastCGI代理
  4. App流量测试--使用安卓自身提供的TCP收发长度统计功能
  5. Controller和RequestMapping
  6. 牛掰本机限速软件appband
  7. 还是觉得应该动手写点东西....
  8. DataGrid的多行提交
  9. MongoDB学习之在Windows下安装MongoDB
  10. Linux CentOs6 命令学习