Table of Contents

A simple C thread pool implementation

Possible enhancements

原理

代码

threadpool.h

threadpool.c

测试例:

heavy.c

shutdown.c

thrdtest.c


A simple C thread pool implementation

Currently, the implementation:

  • Works with pthreads only, but API is intentionally opaque to allow other implementations (Windows for instance).
  • Starts all threads on creation of the thread pool.
  • Reserves one task for signaling the queue is full.
  • Stops and joins all worker threads on destroy.

Possible enhancements

The API contains addtional unused 'flags' parameters that would allow some additional options:

  • Lazy creation of threads (easy)
  • Reduce number of threads automatically (hard)
  • Unlimited queue size (medium)
  • Kill worker threads on destroy (hard, dangerous)
  • Support Windows API (medium)
  • Reduce locking contention (medium/hard)

https://github.com/mbrossard/threadpool


原理

上图来自:(https://mp.weixin.qq.com/s/O5Ubr9nyUm7os4M6BJGYDA)

相关技术文章

https://blog.csdn.net/qq_36359022/article/details/78796784

https://mp.weixin.qq.com/s/O5Ubr9nyUm7os4M6BJGYDA

代码

threadpool.h

/** Copyright (c) 2016, Mathias Brossard <mathias@brossard.org>.* All rights reserved.* * Redistribution and use in source and binary forms, with or without* modification, are permitted provided that the following conditions are* met:* *  1. Redistributions of source code must retain the above copyright*     notice, this list of conditions and the following disclaimer.* *  2. Redistributions in binary form must reproduce the above copyright*     notice, this list of conditions and the following disclaimer in the*     documentation and/or other materials provided with the distribution.* * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT* HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.*/#ifndef _THREADPOOL_H_
#define _THREADPOOL_H_#ifdef __cplusplus
extern "C" {
#endif/*** @file threadpool.h* @brief Threadpool Header File*//*** Increase this constants at your own risk* Large values might slow down your system*/
#define MAX_THREADS 64
#define MAX_QUEUE 65536typedef struct threadpool_t threadpool_t;typedef enum {threadpool_invalid        = -1,threadpool_lock_failure   = -2,threadpool_queue_full     = -3,threadpool_shutdown       = -4,threadpool_thread_failure = -5
} threadpool_error_t;typedef enum {threadpool_graceful       = 1
} threadpool_destroy_flags_t;/*** @function threadpool_create* @brief Creates a threadpool_t object.* @param thread_count Number of worker threads.* @param queue_size   Size of the queue.* @param flags        Unused parameter.* @return a newly created thread pool or NULL*/
threadpool_t *threadpool_create(int thread_count, int queue_size, int flags);/*** @function threadpool_add* @brief add a new task in the queue of a thread pool* @param pool     Thread pool to which add the task.* @param function Pointer to the function that will perform the task.* @param argument Argument to be passed to the function.* @param flags    Unused parameter.* @return 0 if all goes well, negative values in case of error (@see* threadpool_error_t for codes).*/
int threadpool_add(threadpool_t *pool, void (*routine)(void *),void *arg, int flags);/*** @function threadpool_destroy* @brief Stops and destroys a thread pool.* @param pool  Thread pool to destroy.* @param flags Flags for shutdown** Known values for flags are 0 (default) and threadpool_graceful in* which case the thread pool doesn't accept any new tasks but* processes all pending tasks before shutdown.*/
int threadpool_destroy(threadpool_t *pool, int flags);#ifdef __cplusplus
}
#endif#endif /* _THREADPOOL_H_ */

threadpool.c

/** Copyright (c) 2016, Mathias Brossard <mathias@brossard.org>.* All rights reserved.* * Redistribution and use in source and binary forms, with or without* modification, are permitted provided that the following conditions are* met:* *  1. Redistributions of source code must retain the above copyright*     notice, this list of conditions and the following disclaimer.* *  2. Redistributions in binary form must reproduce the above copyright*     notice, this list of conditions and the following disclaimer in the*     documentation and/or other materials provided with the distribution.* * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT* HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.*//*** @file threadpool.c* @brief Threadpool implementation file*/#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>#include "threadpool.h"typedef enum {immediate_shutdown = 1,graceful_shutdown  = 2
} threadpool_shutdown_t;/***  @struct threadpool_task*  @brief the work struct**  @var function Pointer to the function that will perform the task.*  @var argument Argument to be passed to the function.*/typedef struct {void (*function)(void *);void *argument;
} threadpool_task_t;/***  @struct threadpool*  @brief The threadpool struct**  @var notify       Condition variable to notify worker threads.*  @var threads      Array containing worker threads ID.*  @var thread_count Number of threads*  @var queue        Array containing the task queue.*  @var queue_size   Size of the task queue.*  @var head         Index of the first element.*  @var tail         Index of the next element.*  @var count        Number of pending tasks*  @var shutdown     Flag indicating if the pool is shutting down*  @var started      Number of started threads*/
struct threadpool_t {pthread_mutex_t lock;pthread_cond_t notify;pthread_t *threads;threadpool_task_t *queue;int thread_count;int queue_size;int head;int tail;int count;int shutdown;int started;
};/*** @function void *threadpool_thread(void *threadpool)* @brief the worker thread* @param threadpool the pool which own the thread*/
static void *threadpool_thread(void *threadpool);int threadpool_free(threadpool_t *pool);threadpool_t *threadpool_create(int thread_count, int queue_size, int flags)
{threadpool_t *pool;int i;(void) flags;if(thread_count <= 0 || thread_count > MAX_THREADS || queue_size <= 0 || queue_size > MAX_QUEUE) {return NULL;}if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) {goto err;}/* Initialize */pool->thread_count = 0;pool->queue_size = queue_size;pool->head = pool->tail = pool->count = 0;pool->shutdown = pool->started = 0;/* Allocate thread and task queue */pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * thread_count);pool->queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t) * queue_size);/* Initialize mutex and conditional variable first */if((pthread_mutex_init(&(pool->lock), NULL) != 0) ||(pthread_cond_init(&(pool->notify), NULL) != 0) ||(pool->threads == NULL) ||(pool->queue == NULL)) {goto err;}/* Start worker threads */for(i = 0; i < thread_count; i++) {if(pthread_create(&(pool->threads[i]), NULL,threadpool_thread, (void*)pool) != 0) {threadpool_destroy(pool, 0);return NULL;}pool->thread_count++;pool->started++;}return pool;err:if(pool) {threadpool_free(pool);}return NULL;
}int threadpool_add(threadpool_t *pool, void (*function)(void *),void *argument, int flags)
{int err = 0;int next;(void) flags;if(pool == NULL || function == NULL) {return threadpool_invalid;}if(pthread_mutex_lock(&(pool->lock)) != 0) {return threadpool_lock_failure;}next = (pool->tail + 1) % pool->queue_size;do {/* Are we full ? */if(pool->count == pool->queue_size) {err = threadpool_queue_full;break;}/* Are we shutting down ? */if(pool->shutdown) {err = threadpool_shutdown;break;}/* Add task to queue */pool->queue[pool->tail].function = function;pool->queue[pool->tail].argument = argument;pool->tail = next;pool->count += 1;/* pthread_cond_broadcast */if(pthread_cond_signal(&(pool->notify)) != 0) {err = threadpool_lock_failure;break;}} while(0);if(pthread_mutex_unlock(&pool->lock) != 0) {err = threadpool_lock_failure;}return err;
}int threadpool_destroy(threadpool_t *pool, int flags)
{int i, err = 0;if(pool == NULL) {return threadpool_invalid;}if(pthread_mutex_lock(&(pool->lock)) != 0) {return threadpool_lock_failure;}do {/* Already shutting down */if(pool->shutdown) {err = threadpool_shutdown;break;}pool->shutdown = (flags & threadpool_graceful) ?graceful_shutdown : immediate_shutdown;/* Wake up all worker threads */if((pthread_cond_broadcast(&(pool->notify)) != 0) ||(pthread_mutex_unlock(&(pool->lock)) != 0)) {err = threadpool_lock_failure;break;}/* Join all worker thread */for(i = 0; i < pool->thread_count; i++) {if(pthread_join(pool->threads[i], NULL) != 0) {err = threadpool_thread_failure;}}} while(0);/* Only if everything went well do we deallocate the pool */if(!err) {threadpool_free(pool);}return err;
}int threadpool_free(threadpool_t *pool)
{if(pool == NULL || pool->started > 0) {return -1;}/* Did we manage to allocate ? */if(pool->threads) {free(pool->threads);free(pool->queue);/* Because we allocate pool->threads after initializing themutex and condition variable, we're sure they'reinitialized. Let's lock the mutex just in case. */pthread_mutex_lock(&(pool->lock));pthread_mutex_destroy(&(pool->lock));pthread_cond_destroy(&(pool->notify));}free(pool);    return 0;
}static void *threadpool_thread(void *threadpool)
{threadpool_t *pool = (threadpool_t *)threadpool;threadpool_task_t task;for(;;) {/* Lock must be taken to wait on conditional variable */pthread_mutex_lock(&(pool->lock));/* Wait on condition variable, check for spurious wakeups.When returning from pthread_cond_wait(), we own the lock. */while((pool->count == 0) && (!pool->shutdown)) {pthread_cond_wait(&(pool->notify), &(pool->lock));}if((pool->shutdown == immediate_shutdown) ||((pool->shutdown == graceful_shutdown) &&(pool->count == 0))) {break;}/* Grab our task */task.function = pool->queue[pool->head].function;task.argument = pool->queue[pool->head].argument;pool->head = (pool->head + 1) % pool->queue_size;pool->count -= 1;/* Unlock */pthread_mutex_unlock(&(pool->lock));/* Get to work */(*(task.function))(task.argument);}pool->started--;pthread_mutex_unlock(&(pool->lock));pthread_exit(NULL);return(NULL);
}

测试例:

heavy.c

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <assert.h>#include "threadpool.h"#define THREAD 4
#define SIZE   8192
#define QUEUES 64/** Warning do not increase THREAD and QUEUES too much on 32-bit* platforms: because of each thread (and there will be THREAD ** QUEUES of them) will allocate its own stack (8 MB is the default on* Linux), you'll quickly run out of virtual space.*/threadpool_t *pool[QUEUES];
int tasks[SIZE], left;
pthread_mutex_t lock;int error;void dummy_task(void *arg) {int *pi = (int *)arg;*pi += 1;if(*pi < QUEUES) {assert(threadpool_add(pool[*pi], &dummy_task, arg, 0) == 0);} else {pthread_mutex_lock(&lock);left--;pthread_mutex_unlock(&lock);}
}int main(int argc, char **argv)
{int i, copy = 1;left = SIZE;pthread_mutex_init(&lock, NULL);for(i = 0; i < QUEUES; i++) {pool[i] = threadpool_create(THREAD, SIZE, 0);assert(pool[i] != NULL);}usleep(10);for(i = 0; i < SIZE; i++) {tasks[i] = 0;assert(threadpool_add(pool[0], &dummy_task, &(tasks[i]), 0) == 0);}while(copy > 0) {usleep(10);pthread_mutex_lock(&lock);copy = left;pthread_mutex_unlock(&lock);}for(i = 0; i < QUEUES; i++) {assert(threadpool_destroy(pool[i], 0) == 0);}pthread_mutex_destroy(&lock);return 0;
}

shutdown.c

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <assert.h>#include "threadpool.h"#define THREAD 4
#define SIZE   8192threadpool_t *pool;
int left;
pthread_mutex_t lock;int error;void dummy_task(void *arg) {usleep(100);pthread_mutex_lock(&lock);left--;pthread_mutex_unlock(&lock);
}int main(int argc, char **argv)
{int i;pthread_mutex_init(&lock, NULL);/* Testing immediate shutdown */left = SIZE;pool = threadpool_create(THREAD, SIZE, 0);for(i = 0; i < SIZE; i++) {assert(threadpool_add(pool, &dummy_task, NULL, 0) == 0);}assert(threadpool_destroy(pool, 0) == 0);assert(left > 0);/* Testing graceful shutdown */left = SIZE;pool = threadpool_create(THREAD, SIZE, 0);for(i = 0; i < SIZE; i++) {assert(threadpool_add(pool, &dummy_task, NULL, 0) == 0);}assert(threadpool_destroy(pool, threadpool_graceful) == 0);assert(left == 0);pthread_mutex_destroy(&lock);return 0;
}

thrdtest.c

#define THREAD 32
#define QUEUE  256#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <assert.h>#include "threadpool.h"int tasks = 0, done = 0;
pthread_mutex_t lock;void dummy_task(void *arg) {usleep(10000);pthread_mutex_lock(&lock);done++;pthread_mutex_unlock(&lock);
}int main(int argc, char **argv)
{threadpool_t *pool;pthread_mutex_init(&lock, NULL);assert((pool = threadpool_create(THREAD, QUEUE, 0)) != NULL);fprintf(stderr, "Pool started with %d threads and ""queue size of %d\n", THREAD, QUEUE);while(threadpool_add(pool, &dummy_task, NULL, 0) == 0) {pthread_mutex_lock(&lock);tasks++;pthread_mutex_unlock(&lock);}fprintf(stderr, "Added %d tasks\n", tasks);while((tasks / 2) > done) {usleep(10000);}assert(threadpool_destroy(pool, 0) == 0);fprintf(stderr, "Did %d tasks\n", done);return 0;
}

GitHub开源项目之“线程池”相关推荐

  1. github开源项目大集合(1)

    github开源项目大集合 目前包括: Android 开源项目第一篇--个性化控件(View)篇  包括ListView.ActionBar.Menu.ViewPager.Gallery.GridV ...

  2. GitHub开源项目 - Jeecg-Boot开始开发平台介绍

    GitHub开源项目 - Jeecg-Boot开始开发平台介绍 Jeecg-Boot 是一款基于SpringBoot+代码生成器的快速开发平台!采用前后端分离架构:SpringBoot,Mybatis ...

  3. github开源项目免费使用Azure PipeLine

    微软收购Github后,很多人猜想微软可能会砍掉VSTS,然而事实VSTS并没有砍掉,关于Azure Devops的详细信息可以查看 这篇博客,如果想查看原文也可以从链接里提供的原始地址里查看. 今天 ...

  4. [Android开源项目] GitHub开源项目总结 (转)

    [Android开源项目] GitHub开源项目总结 GitHub开源项目android-styled-dialogs http://neast.cn/forum.php?mod=viewthread ...

  5. 如何参与一个GitHub开源项目

    Github作为开源项目的著名托管地,可谓无人不知,越来越多的个人和公司纷纷加入到Github的大家族里来,为开源尽一份绵薄之力.对于个人来讲,你把自己的项目托管到Github上并不表示你参与了Git ...

  6. 推荐标星 100 K 的 GitHub 开源项目

    推荐标星 100 K 的 GitHub 开源项目 原文见:推荐 10 个标星 100 K 的 GitHub 开源项目 以下摘录部分: Build Your Own X (GitHub Star:102 ...

  7. github开源项目_GitHub项目分析,3D打印义肢和更多开源新闻

    github开源项目 在本周的开源新闻摘要中,我们将介绍GitHub项目分析,3D打印修复技术,微生物组预测算法等等! 2016年6月26日至7月2日的开源新闻摘要 GitHub通过Google Bi ...

  8. [高光谱] GitHub开源项目Hyperspectral-Classification的解析

    GitHub链接:Hyperspectral-Classification Pytorch. 画重点!!! 完整版看这里嗷:GitHub开源项目Hyperspectral-Classification ...

  9. android开源数据库,Android Hawk数据库 github开源项目

    Android Hawk数据库 github开源项目 Hawk 是一个很便捷的数据库  . 操作数据库仅仅需一行代码 , 能存不论什么数据类型 . github 地址: https://github. ...

最新文章

  1. 数据库权限的分配与回收
  2. UA MATH567 高维统计III 随机矩阵7 亚高斯矩阵的应用:Stochastic Block Model与社区发现 问题描述
  3. 宝塔服务器搞成虚拟主机,宝塔面板怎么配置虚拟主机
  4. 透露抖音、腾讯、阿里、美团招开发岗位硬核面试题,轻轻松松收到offer
  5. elasticsearch 6.1.1 transport jar
  6. Android之开源视频压缩框架RxFFmpeg的commands设置
  7. debug内exe文件复制到桌面无法打开_Diffinity.轻量级的文件|文本对比工具
  8. 几张动态图弄懂递归,二叉树,二分查找简短算法
  9. 【python游戏编程之旅】第一篇---初识pygame
  10. R 语言从Github上安装R语言的程序包
  11. BoundsChecker使用说明(代码调试)
  12. web服务器 怎样上传文件,文件上传web服务器
  13. 麦考利久期公式(c语言实现)
  14. Linux系统装intel网卡,在Centos下安装intel网卡的方法
  15. 不同Ubuntu版本,对应ROS版本
  16. java微信素材编辑_Java微信公众平台开发之素材管理(Spring Boot 2.X)
  17. Kruise Rollout: 让所有应用负载都能使用渐进式交付
  18. 欢迎大家访问我的网站
  19. h5 登录页面_鲁班H5作者:@小小鲁班
  20. 使用css animation动画做边框闪动效果

热门文章

  1. DockerFile : COPY 和 ADD 命令不能拷贝上下文之外的本地文件
  2. Spring框架----Spring的依赖注入
  3. 微信小程序云开发之云函数的创建与环境配置
  4. Python中 模块、包、库
  5. Ubuntu 下无法Tab键自动补全功能解决办法
  6. PHP 500 -Invalid command RewriteEngine的解决
  7. 【转】C#+csgl库进行OpenGL编程
  8. Iphone客户端程序员半年工作总结
  9. 实现Servlet虚拟路径的映射
  10. shiro认证授权过程