Libuv源码分析 —— 8. 线程池
网络I/O
- 在 上一节 的学习中,我们已经搞明白了网络I/O的基本过程,并通过了解进程/线程间通信来熟悉这个流程。下面,让咱们学习线程池中的线程如何工作、并和主进程进行通信的吧!
线程池
- Libuv 是基于事件驱动的异步库。对于耗时的操作。如果在 Libuv 的主循环里执行的话, 就会阻塞后面的任务执行。所以 Libuv 里维护了一个线程池。他负责处理 Libuv 中耗时 的操作,比如文件 io、dns、用户自定义的耗时任务(文件 io 因为存在跨平台兼容的问 题。无法很好地在事件驱动模块实现异步 io)
- 线程池是全局的,并且在所有事件循环中共享
Thread pool work scheduling
数据类型
uv_work_t
工作请求类型。
void
(*uv_work_cb)
( uv_work_t * req )
void
(*uv_after_work_cb)
( uv_work_t * req , int 状态)uv_queue_work()
在线程池上的工作完成后,将在循环线程上调用的回调。如果工作被取消使用状态将是。uv_cancel()
UV_ECANCELED
API
int
uv_queue_work
(uv_loop_t* loop, uv_work_t* req, uv_work_cb work_cb, uv_after_work_cb after_work_cb)初始化一个工作请求,它将在线程池中的一个线程中运行给定的work_cb。一旦work_cb完成,将在循环线程上调用after_work_cb 。
可以使用 取消此请求
uv_cancel()
。
example
- 例
#include <stdio.h> #include <stdlib.h> #include <unistd.h>#include <uv.h>#define FIB_UNTIL 25 uv_loop_t *loop;long fib_(long t) {if (t == 0 || t == 1)return 1;elsereturn fib_(t-1) + fib_(t-2); }// 将在不同的函数中运行 void fib(uv_work_t *req) {int n = *(int *) req->data;if (random() % 2)sleep(1);elsesleep(3);long fib = fib_(n);fprintf(stderr, "%dth fibonacci is %lu\n", n, fib); }void after_fib(uv_work_t *req, int status) {fprintf(stderr, "Done calculating %dth fibonacci\n", *(int *) req->data); }/*我们将要执行fibonacci数列,并且睡眠一段时间,将阻塞和cpu占用时间长的任务分配到不同的线程,使得其不会阻塞event loop上的其他任务 */ int main() {loop = uv_default_loop();int data[FIB_UNTIL];uv_work_t req[FIB_UNTIL]; // 子线程的参数int i;for (i = 0; i < FIB_UNTIL; i++) {data[i] = i;// 可以通过void *data传递任何数据,使用它来完成线程之间的沟通任务req[i].data = (void *) &data[i];uv_queue_work(loop, &req[i], fib, after_fib);}return uv_run(loop, UV_RUN_DEFAULT); }/*执行结果0th fibonacci is 12th fibonacci is 23th fibonacci is 3Done calculating 0th fibonacciDone calculating 2th fibonacciDone calculating 3th fibonacci4th fibonacci is 55th fibonacci is 8Done calculating 4th fibonacciDone calculating 5th fibonacci1th fibonacci is 1Done calculating 1th fibonacci8th fibonacci is 34Done calculating 8th fibonacci9th fibonacci is 55Done calculating 9th fibonacci6th fibonacci is 13Done calculating 6th fibonacci11th fibonacci is 144Done calculating 11th fibonacci7th fibonacci is 21Done calculating 7th fibonacci13th fibonacci is 377Done calculating 13th fibonacci14th fibonacci is 610Done calculating 14th fibonacci10th fibonacci is 89Done calculating 10th fibonacci12th fibonacci is 233Done calculating 12th fibonacci15th fibonacci is 987Done calculating 15th fibonacci16th fibonacci is 159717th fibonacci is 2584Done calculating 16th fibonacciDone calculating 17th fibonacci18th fibonacci is 4181Done calculating 18th fibonacci20th fibonacci is 10946Done calculating 20th fibonacci22th fibonacci is 28657Done calculating 22th fibonacci23th fibonacci is 46368Done calculating 23th fibonacci19th fibonacci is 6765Done calculating 19th fibonacci21th fibonacci is 17711Done calculating 21th fibonacci24th fibonacci is 75025Done calculating 24th fibonacci */
线程间的同步原语
Mutex锁
互斥锁用于对资源的互斥访问,当你访问的内存资源可能被别的线程访问到,这个时候你就可以考虑使用互斥锁,在访问的时候锁住。对应的使用流程可能是这样的:
- 初始化互斥锁:uv_mutex_init(uv_mutex_t* handle)
- 锁住互斥资源:uv_mutex_lock(uv_mutex_t* handle)
- 解锁互斥资源:uv_mutex_unlock(uv_mutex_t* handle)
读写锁
信号量
信号量是一种专门用于提供不同进程间或线程间同步手段的原语。信号量本质上是一个非负整数计数器,代表共享资源的数目,通常是用来控制对共享资源的访问。一般使用步骤是这样的:
- 初始化信号量:int uv_sem_init(uv_sem_t* sem, unsigned int value)
- 信号量加1:void uv_sem_wait(uv_sem_t* sem)
- 信号量减1:void uv_sem_post(uv_sem_t* sem)
- 信号量销毁:void uv_sem_wait(uv_sem_t* sem)
条件变量
- 初始化条件变量:int uv_cond_init(uv_cond_t* cond)
- 线程阻塞等待被唤醒:void uv_cond_wait(uv_cond_t cond, uv_mutex_t mutex)
- 别的线程唤醒阻塞的线程:void uv_cond_signal(uv_cond_t* cond)
屏障
- 初始化屏障需要达到的个数:int uv_barrier_init(uv_barrier_t* barrier, unsigned int count)
- 每当达到条件便将计数+1:int uv_barrier_wait(uv_barrier_t* barrier)
- 销毁屏障:void uv_barrier_destroy(uv_barrier_t* barrier)
源码解析
init_threads
- 线程池的初始化主要是初始化一些数据结构,然后创建多个线程。接着在每个线程里执行 worker 函数
- 源码
static void init_threads(void) {unsigned int i;const char* val;uv_sem_t sem;// 默认线程数 4 个,static uv_thread_t default_threads[4];nthreads = ARRAY_SIZE(default_threads);// 判断用户是否在环境变量中设置了线程数,是的话取用户定义的val = getenv("UV_THREADPOOL_SIZE");if (val != NULL)nthreads = atoi(val);if (nthreads == 0)nthreads = 1;// #define MAX_THREADPOOL_SIZE 128 最多 128 个线程if (nthreads > MAX_THREADPOOL_SIZE)nthreads = MAX_THREADPOOL_SIZE;threads = default_threads;// 超过默认大小,重新分配内存if (nthreads > ARRAY_SIZE(default_threads)) {threads = uv__malloc(nthreads * sizeof(threads[0]));// 分配内存失败,回退到默认if (threads == NULL) {nthreads = ARRAY_SIZE(default_threads);threads = default_threads;}}// 初始化条件变量if (uv_cond_init(&cond))abort();// 初始化互斥变量if (uv_mutex_init(&mutex))abort();// 初始化三个队列QUEUE_INIT(&wq);QUEUE_INIT(&slow_io_pending_wq);QUEUE_INIT(&run_slow_work_message);// 初始化信号量变量,值为 0if (uv_sem_init(&sem, 0))abort();// 创建多个线程,工作函数为 worker,sem 为 worker 入参for (i = 0; i < nthreads; i++)if (uv_thread_create(threads + i, worker, &sem))abort();// 为 0 则阻塞,非 0 则减一,这里等待所有线程启动成功再往下执行for (i = 0; i < nthreads; i++)uv_sem_wait(&sem);uv_sem_destroy(&sem); }
uv__work_submit — 给线程池提交一个任务
- uv_work
struct uv__work {void (*work)(struct uv__work *w);void (*done)(struct uv__work *w, int status);struct uv_loop_s* loop;void* wq[2]; };
- 源码
void uv__work_submit(uv_loop_t* loop,struct uv__work* w,enum uv__work_kind kind,void (*work)(struct uv__work* w),void (*done)(struct uv__work* w, int status)) {// 保证已经初始化线程,并只执行一次,所以线程池是在提交第一个任务的时候才被初始化uv_once(&once, init_once);w->loop = loop;w->work = work;w->done = done;// 调 post 往线程池的队列中加入一个新的任务。Libuv 把任务分为三种类型,慢// io(dns 解析)、快 io(文件操作)、cpu 密集型等,kind 就是说明任务的类型的post(&w->wq, kind);
}static void init_once(void) {#ifndef _WIN32if (pthread_atfork(NULL, NULL, &reset_once))abort();#endifinit_threads();
}// 把任务插入队列等待线程处理
static void post(QUEUE* q, enum uv__work_kind kind) {// 加锁访问任务队列,因为这个队列是线程池共享的uv_mutex_lock(&mutex);// 类型是慢 IOif (kind == UV__WORK_SLOW_IO) {/* 插入慢 IO 对应的队列,llibuv 这个版本把任务分为几种类型,对于慢 io 类型的任务,libuv 是往任务队列里面插入一个特殊的节点run_slow_work_message,然后用 slow_io_pending_wq 维护了一个慢 io 任务的队列,当处理到 run_slow_work_message 这个节点的时候,libuv 会从 slow_io_pending_wq队列里逐个取出任务节点来执行。*/QUEUE_INSERT_TAIL(&slow_io_pending_wq, q);/*有慢 IO 任务的时候,需要给主队列 wq 插入一个消息节点 run_slow_work_message,说明有慢 IO 任务,所以如果 run_slow_work_message 是空,说明还没有插入主队列。需要进行 q = &run_slow_work_message;赋值,然后把 run_slow_work_message 插入主队列。如果 run_slow_work_message 非空,说明已经插入线程池的任务队列了。解锁然后直接返回。*/if (!QUEUE_EMPTY(&run_slow_work_message)) {/* Running slow I/O tasks is already scheduled => Nothing to do here.The worker that runs said other task will schedule this one as well. */uv_mutex_unlock(&mutex);return;}// 说明 run_slow_work_message 还没有插入队列,准备插入队列q = &run_slow_work_message;}// 把节点插入主队列,可能是慢 IO 消息节点(如果遍历这个队列发现是消息节点// 就可以执行 slow_io_pending_wq 队列里的任务了)或者一般任务(直接执行)QUEUE_INSERT_TAIL(&wq, q);// 有空闲线程则唤醒他,如果大家都在忙,则等到他忙完后就会重新判断是否还有新任务if (idle_threads > 0)uv_cond_signal(&cond);uv_mutex_unlock(&mutex);
}
uv_queue_work — 针对cpu密集型提交一个任务
- 通过 uv_queue_work 提交的任务,是对应一个 request 的。如果该 request 对应的任务没有执行完,则事件循环不会退出。而通过 uv__work_submit 方式提交的任务就算没有执行完,也不会影响事件循环的退出。
- uv_work_t
struct uv_work_t {UV_REQ_FIELDSuv_loop_t* loop;uv_work_cb work_cb;uv_after_work_cb after_work_cb;UV_WORK_PRIVATE_FIELDS
};
- 源码
int uv_queue_work(uv_loop_t* loop,uv_work_t* req,uv_work_cb work_cb,uv_after_work_cb after_work_cb) {if (work_cb == NULL)return UV_EINVAL;// 使 (loop)->active_reqs.count++uv__req_init(loop, req, UV_WORK);req->loop = loop;req->work_cb = work_cb;req->after_work_cb = after_work_cb;uv__work_submit(loop,&req->work_req,UV__WORK_CPU, // 是CPU密集型的uv__queue_work, // 当这个任务被执行的时候。他会执行函数 uv__queue_workuv__queue_done); // 当这个任务执行结束的时候。他会执行函数 uv__queue_donereturn 0; }static void uv__queue_work(struct uv__work* w) {// 通过结构体某字段拿到结构体地址uv_work_t* req = container_of(w, uv_work_t, work_req);req->work_cb(req); }static void uv__queue_done(struct uv__work* w, int err) {uv_work_t* req;req = container_of(w, uv_work_t, work_req);// 使 (loop)->active_reqs.count--uv__req_unregister(req->loop, req);if (req->after_work_cb == NULL)return;req->after_work_cb(req, err); }
worker —— 线程池中的线程执行的函数
- 线程池中把任务分为三种。并且对于慢 io 类型的 任务,还限制了线程数。其余的逻辑和一般的线程池类似,就是互斥访问任务队列,然后取出节点执行,最后执行回调。不过 libuv 这里不是直接回调用户的函数。而是通知主进程。由主进程处理
- 源码
// 该线程池在用户提交了第一个任务的时候初始化,而不是系统启动的时候就初始化 static void worker(void* arg) {struct uv__work* w;QUEUE* q;int is_slow_work;// 线程启动成功,因为初始化线程的时候,等待所有线程都执行成功之后才会往下执行uv_sem_post((uv_sem_t*) arg);arg = NULL;// 加锁互斥访问任务队列uv_mutex_lock(&mutex);for (;;) {/*1 队列为空,2 队列不为空,但是队列里只有慢 IO 任务且正在执行的慢 IO 任务个数达到阈值则空闲线程加一,防止慢 IO 占用过多线程,导致其他快的任务无法得到执行*/while (QUEUE_EMPTY(&wq) ||(QUEUE_HEAD(&wq) == &run_slow_work_message &&QUEUE_NEXT(&run_slow_work_message) == &wq &&slow_io_work_running >= slow_work_thread_threshold())) {idle_threads += 1;// 阻塞,等待队列中有任务的时候唤醒uv_cond_wait(&cond, &mutex);// 被唤醒,开始干活,空闲线程数减一idle_threads -= 1;}// 取出头结点,头指点可能是退出消息、慢 IO,一般请求q = QUEUE_HEAD(&wq);// 如果头结点是退出消息,则结束线程if (q == &exit_message) {// 唤醒其他因为没有任务正阻塞等待任务的线程,别的线程同样取出这个节点,结束线程...// 最后线程会全部结束uv_cond_signal(&cond);uv_mutex_unlock(&mutex);break;}// 移除节点QUEUE_REMOVE(q);// 重置前后指针QUEUE_INIT(q); /* Signal uv_cancel() that the work req is executing. */is_slow_work = 0;/*如果当前节点等于慢 IO 节点,上面的 while 只判断了是不是只有慢 io 任务且达到阈值,这里是任务队列里肯定有非慢 io 任务,可能有慢 io,如果有慢 io 并且正在执行的个数达到阈值,则先不处理该慢 io 任务,继续判断是否还有非慢 io 任务可执行。*/if (q == &run_slow_work_message) {// 遇到阈值,重新入队if (slow_io_work_running >= slow_work_thread_threshold()) {QUEUE_INSERT_TAIL(&wq, q);continue;}// 没有慢 IO 任务则继续if (QUEUE_EMPTY(&slow_io_pending_wq))continue;// 有慢 io,开始处理慢 IO 任务is_slow_work = 1;// 正在处理慢 IO 任务的个数累加,用于其他线程判断慢 IO 任务个数是否达到阈值slow_io_work_running++;// 摘下一个慢 io 任务q = QUEUE_HEAD(&slow_io_pending_wq);QUEUE_REMOVE(q);QUEUE_INIT(q);/*取出一个任务后,如果还有慢 IO 任务则把慢 IO 标记节点重新入队,表示还有慢 IO 任务,因为上面把该标记节点出队了*/if (!QUEUE_EMPTY(&slow_io_pending_wq)) {// 有空闲线程则唤醒他,因为还有任务处理QUEUE_INSERT_TAIL(&wq, &run_slow_work_message);if (idle_threads > 0)uv_cond_signal(&cond);}}// 不需要操作队列了,尽快释放锁uv_mutex_unlock(&mutex);// q 是慢 IO 或者一般任务w = QUEUE_DATA(q, struct uv__work, wq);// 执行业务的任务函数,该函数一般会阻塞w->work(w);// 准备操作 loop 的任务完成队列,加锁uv_mutex_lock(&w->loop->wq_mutex);// 置空说明指向完了,不能被取消了,见 cancel 逻辑w->work = NULL; // 执行完任务,插入到 loop 的 wq 队列,在 uv__work_done 的时候会执行队列中节点的 done 函数QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);// 通知 loop 的 wq_async 节点uv_async_send(&w->loop->wq_async);uv_mutex_unlock(&w->loop->wq_mutex);// 为下一轮操作任务队列加锁uv_mutex_lock(&mutex);if (is_slow_work) {// 执行完慢 IO 任务,记录正在执行的慢 IO 个数变量减 1,上面加锁保证了互斥访问这个变量slow_io_work_running--;}} }
主进程初始化线程池的过程
- 在
uv_loop_init
中uv_async_init(loop, &loop->wq_async, uv__work_done);
- 线程池中的线程执行的函数
work
最后有一句uv_async_send(&w->loop->wq_async);
- wq_async 是 用于线程池和主线程通信的 async handle 。 他对应 的回调是 uv__work_done 。所以当 一 个 线 程 池 的 线 程 任 务 完 成 时 , 通 过 uv_async_send(&w->loop->wq_async)设置 loop->wq_async.pending = 1,然后 通知 io 观察者。Libuv 在 poll io 阶段就会执行该 handle 对应的回调。该 io 观察者的 回调是 uv__work_done 函数
uv__work_done
- 源码
void uv__work_done(uv_async_t* handle) {struct uv__work* w;uv_loop_t* loop;QUEUE* q;QUEUE wq;int err;// 通过结构体字段获得结构体首地址loop = container_of(handle, uv_loop_t, wq_async);// 准备处理队列,加锁uv_mutex_lock(&loop->wq_mutex);// 把 loop->wq 队列的节点全部移到 wp 变量中,这样一来可以尽快释放锁QUEUE_MOVE(&loop->wq, &wq);// 不需要使用了,解锁uv_mutex_unlock(&loop->wq_mutex);// wq 队列的节点来源是在线程的 worker 里插入while (!QUEUE_EMPTY(&wq)) {q = QUEUE_HEAD(&wq);QUEUE_REMOVE(q);w = container_of(q, struct uv__work, wq);err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;// 执行回调w->done(w, err);} }
uv__work_cancel 取消提交的任务
- 源码
static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {int cancelled;// 加锁,为了把节点移出队列uv_mutex_lock(&mutex);// 加锁,为了判断 w->wq 是否为空uv_mutex_lock(&w->loop->wq_mutex);/*w 在任务队列中并且任务函数 work 不为空,则可取消,在 work 函数中,如果执行完了任务,会把 work 置 NULL,所以一个任务可以取消的前提是他还没执行完。或者说还没执行过*/cancelled = !QUEUE_EMPTY(&w->wq) && w->work != NULL;// 从任务队列中删除该节点if (cancelled)QUEUE_REMOVE(&w->wq);uv_mutex_unlock(&w->loop->wq_mutex);uv_mutex_unlock(&mutex);// 不能取消if (!cancelled)return UV_EBUSY;// 重置回调函数w->work = uv__cancelled;uv_mutex_lock(&loop->wq_mutex);/*插入 loop 的 wq 队列,对于取消的动作,libuv 认为是任务执行完了。所以插入已完成的队列,不过他的回调是 uv__cancelled 函数,而不是用户设置的回调*/QUEUE_INSERT_TAIL(&loop->wq, &w->wq);// 通知主线程有任务完成uv_async_send(&loop->wq_async);uv_mutex_unlock(&loop->wq_mutex);return 0; }
Libuv源码分析 —— 8. 线程池相关推荐
- 深入源码分析Java线程池的实现原理
转载自 深入源码分析Java线程池的实现原理 程序的运行,其本质上,是对系统资源(CPU.内存.磁盘.网络等等)的使用.如何高效的使用这些资源是我们编程优化演进的一个方向.今天说的线程池就是一种对 ...
- Hbase Compaction 源码分析 - CompactSplitThread 线程池选择
目录 CompactSplitThread requestCompactionInternal方法 selectCompaction方法 requestCompaction方法 其他相关文章 Hbas ...
- 从原理到实现丨手把手教你写一个线程池丨源码分析丨线程池内部组成及优化
人人都能学会的线程池 手写完整版 1. 线程池的使用场景 2. 线程池的内部组成 3. 线程池优化 [项目实战]从原理到实现丨手把手教你写一个线程池丨源码分析丨线程池内部组成及优化 内容包括:C/C+ ...
- 从源码分析创建线程池的4种方式
摘要:从创建线程池的源码来深入分析究竟有哪些方式可以创建线程池. 本文分享自华为云社区<[高并发]从源码角度分析创建线程池究竟有哪些方式>,作者:冰 河 . 在Java的高并发领域,线程池 ...
- nginx源码分析之内存池与线程池丨nginx的多进程网络实现
nginx源码分析之内存池与线程池 1. nginx的使用场景 2. nginx源码 内存池,线程池,日志 3. nginx的多进程网络实现 视频讲解如下,点击观看: [Linux后台开发系统]ngi ...
- 从源码角度解析线程池中顶层接口和抽象类
摘要:我们就来看看线程池中那些非常重要的接口和抽象类,深度分析下线程池中是如何将抽象这一思想运用的淋漓尽致的. 本文分享自华为云社区<[高并发]深度解析线程池中那些重要的顶层接口和抽象类> ...
- Netty技术细节源码分析-Recycler对象池原理分析
本文是该篇的修正版 本文的github地址:点此 该文所涉及的netty源码版本为4.1.6. Netty的对象池Recycler是什么 Recycler是Netty中基于ThreadLocal的轻量 ...
- hibernate 并发获取session失败 空指针_高并发之|通过ThreadPoolExecutor类的源码深度解析线程池执行任务的核心流程...
核心逻辑概述 ThreadPoolExecutor是Java线程池中最核心的类之一,它能够保证线程池按照正常的业务逻辑执行任务,并通过原子方式更新线程池每个阶段的状态. ThreadPoolExecu ...
- Libuv源码分析 —— 9. DNS
在上节中,我们学会了线程池的执行流程,在这一节中,咱们一起了解 DNS 是如何利用线程池完成解析这种慢IO操作的. DNS Libuv 提供了一个异步 dns 解析的能力.包括通过域名查询 ip 和 ...
最新文章
- 初学Python——文件操作第三篇
- 使用 NSUserDefaults 存储字典的一个坑
- vant 索引城市不对_Vant Area 省市区选择
- 启发式搜索给神经网络_神经科学如何支持UX启发式
- ajax异步加载和cmd,异步传输Ajax(JQ)
- 数组的foreach方法和jQuery中的each方法
- qtableview点击行将整行数据传过去_掌握这15个可视化图表,小白也能轻松玩转数据分析...
- java判断三位数的范围代码_java判断三位数的实例讲解
- hdu 2046 骨牌铺方格
- 西瓜书读书笔记3-对数几率回归(logistic回归)公式推导
- DevOps使用教程 华为云(15)git如何将本地项目初始化为远程仓库
- TDengine C/C++ Connector
- ASCII码字符对照表 阿斯克码表
- 沪漂五年:我是如何从职场失意,走向皮实的人生?
- 人工智能自然语言处理技术处理专业领域的运用
- UWP 应用通知Notifications
- 动名词到底什么时候才用? ———— 英语菜鸟最后的倔强!
- 爬虫清洗:python strip()函数 去空格\n\r\t函数的用法
- 有关振动试验夹具的问题
- EXP9 web安全基础实践