网络I/O

  • 在 上一节 的学习中,我们已经搞明白了网络I/O的基本过程,并通过了解进程/线程间通信来熟悉这个流程。下面,让咱们学习线程池中的线程如何工作、并和主进程进行通信的吧!

线程池

  • Libuv 是基于事件驱动的异步库。对于耗时的操作。如果在 Libuv 的主循环里执行的话, 就会阻塞后面的任务执行。所以 Libuv 里维护了一个线程池。他负责处理 Libuv 中耗时 的操作,比如文件 io、dns、用户自定义的耗时任务(文件 io 因为存在跨平台兼容的问 题。无法很好地在事件驱动模块实现异步 io)
  • 线程池是全局的,并且在所有事件循环中共享

Thread pool work scheduling

数据类型
  • uv_work_t

    工作请求类型。

API
  • int uv_queue_work(uv_loop_t* loop, uv_work_t* req, uv_work_cb work_cb, uv_after_work_cb after_work_cb)

    初始化一个工作请求,它将在线程池中的一个线程中运行给定的work_cb。一旦work_cb完成,将在循环线程上调用after_work_cb 。

    可以使用 取消此请求uv_cancel()

example
  • #include <stdio.h>
    #include <stdlib.h>
    #include <unistd.h>#include <uv.h>#define FIB_UNTIL 25
    uv_loop_t *loop;long fib_(long t) {if (t == 0 || t == 1)return 1;elsereturn fib_(t-1) + fib_(t-2);
    }// 将在不同的函数中运行
    void fib(uv_work_t *req) {int n = *(int *) req->data;if (random() % 2)sleep(1);elsesleep(3);long fib = fib_(n);fprintf(stderr, "%dth fibonacci is %lu\n", n, fib);
    }void after_fib(uv_work_t *req, int status) {fprintf(stderr, "Done calculating %dth fibonacci\n", *(int *) req->data);
    }/*我们将要执行fibonacci数列,并且睡眠一段时间,将阻塞和cpu占用时间长的任务分配到不同的线程,使得其不会阻塞event loop上的其他任务
    */
    int main() {loop = uv_default_loop();int data[FIB_UNTIL];uv_work_t req[FIB_UNTIL];   // 子线程的参数int i;for (i = 0; i < FIB_UNTIL; i++) {data[i] = i;// 可以通过void *data传递任何数据,使用它来完成线程之间的沟通任务req[i].data = (void *) &data[i];uv_queue_work(loop, &req[i], fib, after_fib);}return uv_run(loop, UV_RUN_DEFAULT);
    }/*执行结果0th fibonacci is 12th fibonacci is 23th fibonacci is 3Done calculating 0th fibonacciDone calculating 2th fibonacciDone calculating 3th fibonacci4th fibonacci is 55th fibonacci is 8Done calculating 4th fibonacciDone calculating 5th fibonacci1th fibonacci is 1Done calculating 1th fibonacci8th fibonacci is 34Done calculating 8th fibonacci9th fibonacci is 55Done calculating 9th fibonacci6th fibonacci is 13Done calculating 6th fibonacci11th fibonacci is 144Done calculating 11th fibonacci7th fibonacci is 21Done calculating 7th fibonacci13th fibonacci is 377Done calculating 13th fibonacci14th fibonacci is 610Done calculating 14th fibonacci10th fibonacci is 89Done calculating 10th fibonacci12th fibonacci is 233Done calculating 12th fibonacci15th fibonacci is 987Done calculating 15th fibonacci16th fibonacci is 159717th fibonacci is 2584Done calculating 16th fibonacciDone calculating 17th fibonacci18th fibonacci is 4181Done calculating 18th fibonacci20th fibonacci is 10946Done calculating 20th fibonacci22th fibonacci is 28657Done calculating 22th fibonacci23th fibonacci is 46368Done calculating 23th fibonacci19th fibonacci is 6765Done calculating 19th fibonacci21th fibonacci is 17711Done calculating 21th fibonacci24th fibonacci is 75025Done calculating 24th fibonacci
    */
    

线程间的同步原语

Mutex锁

互斥锁用于对资源的互斥访问,当你访问的内存资源可能被别的线程访问到,这个时候你就可以考虑使用互斥锁,在访问的时候锁住。对应的使用流程可能是这样的:

读写锁
信号量

信号量是一种专门用于提供不同进程间或线程间同步手段的原语。信号量本质上是一个非负整数计数器,代表共享资源的数目,通常是用来控制对共享资源的访问。一般使用步骤是这样的:

条件变量

条件变量通过允许线程阻塞和等待另一个线程发送信号的方法弥补了互斥锁的不足。条件变量的内部实质上是一个等待队列,放置等待(阻塞)的线程,线程在条件变量上等待和通知,互斥锁用来保护等待队列(因为所有的线程都可以放入等待队列,所以等待队列成为了一个共享的资源,需要被上锁保护),因此条件变量通常和互斥锁一起使用。一般使用步骤是这样的:

屏障

在多线程的时候,我们总会碰到一个需求,就是需要等待一组进程全部执行完毕后再执行某些事,由于多线程是乱序的,无法预估线程都执行到哪里了,这就要求我们有一个屏障作为同步点,在所有有屏障的地方都会阻塞等待,直到所有的线程都的代码都执行到同步点,再继续执行后续代码。使用步骤一般是:

源码解析

init_threads

  • 线程池的初始化主要是初始化一些数据结构,然后创建多个线程。接着在每个线程里执行 worker 函数
  • 源码
    static void init_threads(void) {unsigned int i;const char* val;uv_sem_t sem;// 默认线程数 4 个,static uv_thread_t default_threads[4];nthreads = ARRAY_SIZE(default_threads);// 判断用户是否在环境变量中设置了线程数,是的话取用户定义的val = getenv("UV_THREADPOOL_SIZE");if (val != NULL)nthreads = atoi(val);if (nthreads == 0)nthreads = 1;// #define MAX_THREADPOOL_SIZE 128 最多 128 个线程if (nthreads > MAX_THREADPOOL_SIZE)nthreads = MAX_THREADPOOL_SIZE;threads = default_threads;// 超过默认大小,重新分配内存if (nthreads > ARRAY_SIZE(default_threads)) {threads = uv__malloc(nthreads * sizeof(threads[0]));// 分配内存失败,回退到默认if (threads == NULL) {nthreads = ARRAY_SIZE(default_threads);threads = default_threads;}}// 初始化条件变量if (uv_cond_init(&cond))abort();// 初始化互斥变量if (uv_mutex_init(&mutex))abort();// 初始化三个队列QUEUE_INIT(&wq);QUEUE_INIT(&slow_io_pending_wq);QUEUE_INIT(&run_slow_work_message);// 初始化信号量变量,值为 0if (uv_sem_init(&sem, 0))abort();// 创建多个线程,工作函数为 worker,sem 为 worker 入参for (i = 0; i < nthreads; i++)if (uv_thread_create(threads + i, worker, &sem))abort();// 为 0 则阻塞,非 0 则减一,这里等待所有线程启动成功再往下执行for (i = 0; i < nthreads; i++)uv_sem_wait(&sem);uv_sem_destroy(&sem);
    }
    

uv__work_submit — 给线程池提交一个任务

  • uv_work

    struct uv__work {void (*work)(struct uv__work *w);void (*done)(struct uv__work *w, int status);struct uv_loop_s* loop;void* wq[2];
    };
    
  • 源码
void uv__work_submit(uv_loop_t* loop,struct uv__work* w,enum uv__work_kind kind,void (*work)(struct uv__work* w),void (*done)(struct uv__work* w, int status)) {// 保证已经初始化线程,并只执行一次,所以线程池是在提交第一个任务的时候才被初始化uv_once(&once, init_once);w->loop = loop;w->work = work;w->done = done;// 调 post 往线程池的队列中加入一个新的任务。Libuv 把任务分为三种类型,慢// io(dns 解析)、快 io(文件操作)、cpu 密集型等,kind 就是说明任务的类型的post(&w->wq, kind);
}static void init_once(void) {#ifndef _WIN32if (pthread_atfork(NULL, NULL, &reset_once))abort();#endifinit_threads();
}// 把任务插入队列等待线程处理
static void post(QUEUE* q, enum uv__work_kind kind) {// 加锁访问任务队列,因为这个队列是线程池共享的uv_mutex_lock(&mutex);// 类型是慢 IOif (kind == UV__WORK_SLOW_IO) {/* 插入慢 IO 对应的队列,llibuv 这个版本把任务分为几种类型,对于慢 io 类型的任务,libuv 是往任务队列里面插入一个特殊的节点run_slow_work_message,然后用 slow_io_pending_wq 维护了一个慢 io 任务的队列,当处理到 run_slow_work_message 这个节点的时候,libuv 会从 slow_io_pending_wq队列里逐个取出任务节点来执行。*/QUEUE_INSERT_TAIL(&slow_io_pending_wq, q);/*有慢 IO 任务的时候,需要给主队列 wq 插入一个消息节点 run_slow_work_message,说明有慢 IO 任务,所以如果 run_slow_work_message 是空,说明还没有插入主队列。需要进行 q = &run_slow_work_message;赋值,然后把 run_slow_work_message 插入主队列。如果 run_slow_work_message 非空,说明已经插入线程池的任务队列了。解锁然后直接返回。*/if (!QUEUE_EMPTY(&run_slow_work_message)) {/* Running slow I/O tasks is already scheduled => Nothing to do here.The worker that runs said other task will schedule this one as well. */uv_mutex_unlock(&mutex);return;}// 说明 run_slow_work_message 还没有插入队列,准备插入队列q = &run_slow_work_message;}// 把节点插入主队列,可能是慢 IO 消息节点(如果遍历这个队列发现是消息节点// 就可以执行 slow_io_pending_wq 队列里的任务了)或者一般任务(直接执行)QUEUE_INSERT_TAIL(&wq, q);// 有空闲线程则唤醒他,如果大家都在忙,则等到他忙完后就会重新判断是否还有新任务if (idle_threads > 0)uv_cond_signal(&cond);uv_mutex_unlock(&mutex);
}

uv_queue_work — 针对cpu密集型提交一个任务

  • 通过 uv_queue_work 提交的任务,是对应一个 request 的。如果该 request 对应的任务没有执行完,则事件循环不会退出。而通过 uv__work_submit 方式提交的任务就算没有执行完,也不会影响事件循环的退出。
  • uv_work_t
struct uv_work_t {UV_REQ_FIELDSuv_loop_t* loop;uv_work_cb work_cb;uv_after_work_cb after_work_cb;UV_WORK_PRIVATE_FIELDS
};
  • 源码

    int uv_queue_work(uv_loop_t* loop,uv_work_t* req,uv_work_cb work_cb,uv_after_work_cb after_work_cb) {if (work_cb == NULL)return UV_EINVAL;// 使 (loop)->active_reqs.count++uv__req_init(loop, req, UV_WORK);req->loop = loop;req->work_cb = work_cb;req->after_work_cb = after_work_cb;uv__work_submit(loop,&req->work_req,UV__WORK_CPU,      // 是CPU密集型的uv__queue_work,    // 当这个任务被执行的时候。他会执行函数 uv__queue_workuv__queue_done);   // 当这个任务执行结束的时候。他会执行函数 uv__queue_donereturn 0;
    }static void uv__queue_work(struct uv__work* w) {// 通过结构体某字段拿到结构体地址uv_work_t* req = container_of(w, uv_work_t, work_req);req->work_cb(req);
    }static void uv__queue_done(struct uv__work* w, int err) {uv_work_t* req;req = container_of(w, uv_work_t, work_req);// 使 (loop)->active_reqs.count--uv__req_unregister(req->loop, req);if (req->after_work_cb == NULL)return;req->after_work_cb(req, err);
    }
    

worker —— 线程池中的线程执行的函数

  • 线程池中把任务分为三种。并且对于慢 io 类型的 任务,还限制了线程数。其余的逻辑和一般的线程池类似,就是互斥访问任务队列,然后取出节点执行,最后执行回调。不过 libuv 这里不是直接回调用户的函数。而是通知主进程。由主进程处理
  • 源码
     // 该线程池在用户提交了第一个任务的时候初始化,而不是系统启动的时候就初始化
    static void worker(void* arg) {struct uv__work* w;QUEUE* q;int is_slow_work;// 线程启动成功,因为初始化线程的时候,等待所有线程都执行成功之后才会往下执行uv_sem_post((uv_sem_t*) arg);arg = NULL;// 加锁互斥访问任务队列uv_mutex_lock(&mutex);for (;;) {/*1 队列为空,2 队列不为空,但是队列里只有慢 IO 任务且正在执行的慢 IO 任务个数达到阈值则空闲线程加一,防止慢 IO 占用过多线程,导致其他快的任务无法得到执行*/while (QUEUE_EMPTY(&wq) ||(QUEUE_HEAD(&wq) == &run_slow_work_message &&QUEUE_NEXT(&run_slow_work_message) == &wq &&slow_io_work_running >= slow_work_thread_threshold())) {idle_threads += 1;// 阻塞,等待队列中有任务的时候唤醒uv_cond_wait(&cond, &mutex);// 被唤醒,开始干活,空闲线程数减一idle_threads -= 1;}// 取出头结点,头指点可能是退出消息、慢 IO,一般请求q = QUEUE_HEAD(&wq);// 如果头结点是退出消息,则结束线程if (q == &exit_message) {// 唤醒其他因为没有任务正阻塞等待任务的线程,别的线程同样取出这个节点,结束线程...// 最后线程会全部结束uv_cond_signal(&cond);uv_mutex_unlock(&mutex);break;}// 移除节点QUEUE_REMOVE(q);// 重置前后指针QUEUE_INIT(q);  /* Signal uv_cancel() that the work req is executing. */is_slow_work = 0;/*如果当前节点等于慢 IO 节点,上面的 while 只判断了是不是只有慢 io 任务且达到阈值,这里是任务队列里肯定有非慢 io 任务,可能有慢 io,如果有慢 io 并且正在执行的个数达到阈值,则先不处理该慢 io 任务,继续判断是否还有非慢 io 任务可执行。*/if (q == &run_slow_work_message) {// 遇到阈值,重新入队if (slow_io_work_running >= slow_work_thread_threshold()) {QUEUE_INSERT_TAIL(&wq, q);continue;}// 没有慢 IO 任务则继续if (QUEUE_EMPTY(&slow_io_pending_wq))continue;// 有慢 io,开始处理慢 IO 任务is_slow_work = 1;// 正在处理慢 IO 任务的个数累加,用于其他线程判断慢 IO 任务个数是否达到阈值slow_io_work_running++;// 摘下一个慢 io 任务q = QUEUE_HEAD(&slow_io_pending_wq);QUEUE_REMOVE(q);QUEUE_INIT(q);/*取出一个任务后,如果还有慢 IO 任务则把慢 IO 标记节点重新入队,表示还有慢 IO 任务,因为上面把该标记节点出队了*/if (!QUEUE_EMPTY(&slow_io_pending_wq)) {// 有空闲线程则唤醒他,因为还有任务处理QUEUE_INSERT_TAIL(&wq, &run_slow_work_message);if (idle_threads > 0)uv_cond_signal(&cond);}}// 不需要操作队列了,尽快释放锁uv_mutex_unlock(&mutex);// q 是慢 IO 或者一般任务w = QUEUE_DATA(q, struct uv__work, wq);// 执行业务的任务函数,该函数一般会阻塞w->work(w);// 准备操作 loop 的任务完成队列,加锁uv_mutex_lock(&w->loop->wq_mutex);// 置空说明指向完了,不能被取消了,见 cancel 逻辑w->work = NULL;  // 执行完任务,插入到 loop 的 wq 队列,在 uv__work_done 的时候会执行队列中节点的 done 函数QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);// 通知 loop 的 wq_async 节点uv_async_send(&w->loop->wq_async);uv_mutex_unlock(&w->loop->wq_mutex);// 为下一轮操作任务队列加锁uv_mutex_lock(&mutex);if (is_slow_work) {// 执行完慢 IO 任务,记录正在执行的慢 IO 个数变量减 1,上面加锁保证了互斥访问这个变量slow_io_work_running--;}}
    }
    

主进程初始化线程池的过程

  • uv_loop_init

    uv_async_init(loop, &loop->wq_async, uv__work_done);
    
  • 线程池中的线程执行的函数 work 最后有一句
    uv_async_send(&w->loop->wq_async);
    
  • wq_async 是 用于线程池和主线程通信的 async handle 。 他对应 的回调是 uv__work_done 。所以当 一 个 线 程 池 的 线 程 任 务 完 成 时 , 通 过 uv_async_send(&w->loop->wq_async)设置 loop->wq_async.pending = 1,然后 通知 io 观察者。Libuv 在 poll io 阶段就会执行该 handle 对应的回调。该 io 观察者的 回调是 uv__work_done 函数

uv__work_done

  • 源码

    void uv__work_done(uv_async_t* handle) {struct uv__work* w;uv_loop_t* loop;QUEUE* q;QUEUE wq;int err;// 通过结构体字段获得结构体首地址loop = container_of(handle, uv_loop_t, wq_async);// 准备处理队列,加锁uv_mutex_lock(&loop->wq_mutex);// 把 loop->wq 队列的节点全部移到 wp 变量中,这样一来可以尽快释放锁QUEUE_MOVE(&loop->wq, &wq);// 不需要使用了,解锁uv_mutex_unlock(&loop->wq_mutex);// wq 队列的节点来源是在线程的 worker 里插入while (!QUEUE_EMPTY(&wq)) {q = QUEUE_HEAD(&wq);QUEUE_REMOVE(q);w = container_of(q, struct uv__work, wq);err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;// 执行回调w->done(w, err);}
    }
    

uv__work_cancel 取消提交的任务

  • 源码

    static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {int cancelled;// 加锁,为了把节点移出队列uv_mutex_lock(&mutex);// 加锁,为了判断 w->wq 是否为空uv_mutex_lock(&w->loop->wq_mutex);/*w 在任务队列中并且任务函数 work 不为空,则可取消,在 work 函数中,如果执行完了任务,会把 work 置 NULL,所以一个任务可以取消的前提是他还没执行完。或者说还没执行过*/cancelled = !QUEUE_EMPTY(&w->wq) && w->work != NULL;// 从任务队列中删除该节点if (cancelled)QUEUE_REMOVE(&w->wq);uv_mutex_unlock(&w->loop->wq_mutex);uv_mutex_unlock(&mutex);// 不能取消if (!cancelled)return UV_EBUSY;// 重置回调函数w->work = uv__cancelled;uv_mutex_lock(&loop->wq_mutex);/*插入 loop 的 wq 队列,对于取消的动作,libuv 认为是任务执行完了。所以插入已完成的队列,不过他的回调是 uv__cancelled 函数,而不是用户设置的回调*/QUEUE_INSERT_TAIL(&loop->wq, &w->wq);// 通知主线程有任务完成uv_async_send(&loop->wq_async);uv_mutex_unlock(&loop->wq_mutex);return 0;
    }
    

Libuv源码分析 —— 8. 线程池相关推荐

  1. 深入源码分析Java线程池的实现原理

    转载自   深入源码分析Java线程池的实现原理 程序的运行,其本质上,是对系统资源(CPU.内存.磁盘.网络等等)的使用.如何高效的使用这些资源是我们编程优化演进的一个方向.今天说的线程池就是一种对 ...

  2. Hbase Compaction 源码分析 - CompactSplitThread 线程池选择

    目录 CompactSplitThread requestCompactionInternal方法 selectCompaction方法 requestCompaction方法 其他相关文章 Hbas ...

  3. 从原理到实现丨手把手教你写一个线程池丨源码分析丨线程池内部组成及优化

    人人都能学会的线程池 手写完整版 1. 线程池的使用场景 2. 线程池的内部组成 3. 线程池优化 [项目实战]从原理到实现丨手把手教你写一个线程池丨源码分析丨线程池内部组成及优化 内容包括:C/C+ ...

  4. 从源码分析创建线程池的4种方式

    摘要:从创建线程池的源码来深入分析究竟有哪些方式可以创建线程池. 本文分享自华为云社区<[高并发]从源码角度分析创建线程池究竟有哪些方式>,作者:冰 河 . 在Java的高并发领域,线程池 ...

  5. nginx源码分析之内存池与线程池丨nginx的多进程网络实现

    nginx源码分析之内存池与线程池 1. nginx的使用场景 2. nginx源码 内存池,线程池,日志 3. nginx的多进程网络实现 视频讲解如下,点击观看: [Linux后台开发系统]ngi ...

  6. 从源码角度解析线程池中顶层接口和抽象类

    摘要:我们就来看看线程池中那些非常重要的接口和抽象类,深度分析下线程池中是如何将抽象这一思想运用的淋漓尽致的. 本文分享自华为云社区<[高并发]深度解析线程池中那些重要的顶层接口和抽象类> ...

  7. Netty技术细节源码分析-Recycler对象池原理分析

    本文是该篇的修正版 本文的github地址:点此 该文所涉及的netty源码版本为4.1.6. Netty的对象池Recycler是什么 Recycler是Netty中基于ThreadLocal的轻量 ...

  8. hibernate 并发获取session失败 空指针_高并发之|通过ThreadPoolExecutor类的源码深度解析线程池执行任务的核心流程...

    核心逻辑概述 ThreadPoolExecutor是Java线程池中最核心的类之一,它能够保证线程池按照正常的业务逻辑执行任务,并通过原子方式更新线程池每个阶段的状态. ThreadPoolExecu ...

  9. Libuv源码分析 —— 9. DNS

    在上节中,我们学会了线程池的执行流程,在这一节中,咱们一起了解 DNS 是如何利用线程池完成解析这种慢IO操作的. DNS Libuv 提供了一个异步 dns 解析的能力.包括通过域名查询 ip 和 ...

最新文章

  1. 初学Python——文件操作第三篇
  2. 使用 NSUserDefaults 存储字典的一个坑
  3. vant 索引城市不对_Vant Area 省市区选择
  4. 启发式搜索给神经网络_神经科学如何支持UX启发式
  5. ajax异步加载和cmd,异步传输Ajax(JQ)
  6. 数组的foreach方法和jQuery中的each方法
  7. qtableview点击行将整行数据传过去_掌握这15个可视化图表,小白也能轻松玩转数据分析...
  8. java判断三位数的范围代码_java判断三位数的实例讲解
  9. hdu 2046 骨牌铺方格
  10. 西瓜书读书笔记3-对数几率回归(logistic回归)公式推导
  11. DevOps使用教程 华为云(15)git如何将本地项目初始化为远程仓库
  12. TDengine C/C++ Connector
  13. ASCII码字符对照表 阿斯克码表
  14. 沪漂五年:我是如何从职场失意,走向皮实的人生?
  15. 人工智能自然语言处理技术处理专业领域的运用
  16. UWP 应用通知Notifications
  17. 动名词到底什么时候才用? ———— 英语菜鸟最后的倔强!
  18. 爬虫清洗:python strip()函数 去空格\n\r\t函数的用法
  19. 有关振动试验夹具的问题
  20. EXP9 web安全基础实践

热门文章

  1. rally功能分析与使用介绍
  2. java 二进制 2个字节 高位 低位_高位字节,低位字节应该怎么理解
  3. 判断等腰三角形java_JAVA如何编写程序判断一个三角形是否为等腰三角形
  4. php 获取文件夹下面的文件列表和文件夹列表
  5. spring boot 转pdf (html转pdf)
  6. java将html转pdf
  7. 2 GateWay工作流程+GateWay搭建
  8. UML图之四——活动图
  9. 这可能是中国最好的13个开源项目
  10. 文件上传、下载、导出(图片上传、下载)