Work Stealing Pool线程池

文章目录

  • Work Stealing Pool线程池
    • 1. Work-Stealing算法
    • 2. 线程池的组成
    • 3. 步骤说明
    • 4. 源码

1. Work-Stealing算法

Work-Stealing算法的理念在于让空闲的线程从忙碌的线程的双端队列中偷取任务.

默认情况下, 一个工作线程从它自己内部的双端队列的头部获取任务. 当线程的的队列中没有任务,它从另外的繁忙的线程的双端队列(或者全局的双端队列)的尾部获取任务,因为队列的尾部是最有可能存在还未执行的任务.

这种方式减小了线程之间对任务的竞争的可能性,它也使得线程以最大可能性去获取可执行的线程,因为它们总是在最有可能存在还未执行的任务的地方寻找任务.

2. 线程池的组成

一般来说实现一个线程池主要包括以下几个组成部分:

  • 线程管理器 :用于创建并管理线程池 。

  • 工作线程 :线程池中实际执行任务的线程 。 在初始化线程时会预先创建好固定数目的线程在池中 ,这些初始化的线程一般是处于空闲状态 ,不消耗CPU,占用较小的内存空间 。

  • 任务接口 :每个任务必须实现的接口 ,当线程池中的可执行的任务时 ,被工作线程调试执行。 把任务抽象出来形成任务接口 ,可以做到线程池与具体的任务无关 。

  • 任务队列 :用来存放没有处理的任务,提 供一种缓冲机制。 实现这种结构有好几种方法 ,常用的是队列 ,主要是利用它先进先出的工作原理;另外一种是链表之类的数据结构 ,可以动态为它分配内存空间 ,应用中比较灵活

3. 步骤说明

创建和启动一个Task类,但是不同的是线程池中的每一个线程都有一个本地队列。线程池通过一个任务调度器来分配任务,当主程序创建了一个Task后,由于创建这个Task的线程不是线程池中的线程,则任务调度器会把该Task放入全局队列中。

下面的演示图,Task1和Task2都是主程序创建的,因此都是放在全局队列中,当工作者线程处理Task2时,创建了一个Task3,此时Task3被放入本地队列

为什么要设计本地队列?这样做的优势是充分利用并行。随着越来越多线程竞争工作项,所有的线程访问单一的队列并不是最优的,并且也不安全。所以,将任务放入本地队列,并且由同一个线程处理,这就避免了竞争。

本地队列中的Task,线程会按照LIFO的方式去处理。这是因为在大多数场景下,最后创建的Task可能仍然在cache中,处理它能够提供缓存命中率。显然这意味放弃部分公平性而保证性能。如下面的演示图,

工作者线程1创建了Task2,Task2创建了Task3,Task4,Task5,但最先处理的还是Task5。

线程窃取work stealing

当 A线程开始执行的时候,优先总是处理本地队列中的任务,当它发现本地队列已经空了,那么它会去全局队列中获取Task,当全局队列中也是空的,那么就会发 生工作窃取(work stealing)。任务调度器会把该线程池中额外的任务分配给A线程处理,其效果就好比该线程会才从其他线程的队列中“窃取”一个Task来执行。这样的目的是提高了cpu的使用效率

4. 源码

/** StealThreadPool.h**  Copyright (c) 2019 hikyuu.org**  Created on: 2019-9-16*      Author: fasiondog*/#pragma once
#ifndef HIKYUU_UTILITIES_THREAD_STEALTHREADPOOL_H
#define HIKYUU_UTILITIES_THREAD_STEALTHREADPOOL_H//#include <fmt/format.h>
#include <future>
#include <thread>
#include <chrono>
#include <vector>
#include "ThreadSafeQueue.h"
#include "WorkStealQueue.h"namespace hku {/*** @brief 分布偷取式线程池* @note 主要用于存在递归情况,任务又创建任务加入线程池的情况,否则建议使用普通的线程池* @details* @ingroup ThreadPool*/
class StealThreadPool {public:/*** 默认构造函数,创建和当前系统CPU数一致的线程数*/StealThreadPool() : StealThreadPool(std::thread::hardware_concurrency()) {}/*** 构造函数,创建指定数量的线程* @param n 指定的线程数*/explicit StealThreadPool(size_t n) : m_done(false), m_init_finished(false), m_worker_num(n) {try {for (size_t i = 0; i < m_worker_num; i++) {// 创建工作线程及其任务队列m_queues.push_back(std::unique_ptr<WorkStealQueue>(new WorkStealQueue));m_threads.push_back(std::thread(&StealThreadPool::worker_thread, this, i));}} catch (...) {m_done = true;throw;}m_init_finished = true;}/*** 析构函数,等待并阻塞至线程池内所有任务完成*/~StealThreadPool() {if (!m_done) {join();}}/** 获取工作线程数 */size_t worker_num() const {return m_worker_num;}/** 先线程池提交任务后返回的对应 future 的类型 */template <typename ResultType>using task_handle = std::future<ResultType>;/** 向线程池提交任务 */template <typename FunctionType>task_handle<typename std::result_of<FunctionType()>::type> submit(FunctionType f) {typedef typename std::result_of<FunctionType()>::type result_type;std::packaged_task<result_type()> task(f);task_handle<result_type> res(task.get_future());if (m_local_work_queue) {// 本地线程任务从前部入队列(递归成栈)// 因为在大多数场景下,最后创建的Task可能仍然在cache中,处理它能够提供缓存命中率// 显然这意味放弃部分公平性而保证性能m_local_work_queue->push_front(std::move(task));} else {m_master_work_queue.push(std::move(task));}m_cv.notify_one();return res;}/** 返回线程池结束状态 */bool done() const {return m_done;}/*** 等待各线程完成当前执行的任务后立即结束退出*/void stop() {m_done = true;// 同时加入结束任务指示,以便在dll退出时也能够终止for (size_t i = 0; i < m_worker_num; i++) {m_queues[i]->push_front(std::move(FuncWrapper()));}m_cv.notify_all();  // 唤醒所有工作线程for (size_t i = 0; i < m_worker_num; i++) {if (m_threads[i].joinable()) {m_threads[i].join();}}}/*** 等待并阻塞至线程池内所有任务完成* @note 至此线程池能工作线程结束不可再使用*/void join() {// 指示各工作线程在未获取到工作任务时,停止运行for (size_t i = 0; i < m_worker_num; i++) {m_master_work_queue.push(std::move(FuncWrapper()));}// 唤醒所有工作线程m_cv.notify_all();// 等待线程结束for (size_t i = 0; i < m_worker_num; i++) {if (m_threads[i].joinable()) {m_threads[i].join();}}m_done = true;}private:typedef FuncWrapper task_type;std::atomic_bool m_done;       // 线程池全局需终止指示bool m_init_finished;          // 线程池是否初始化完毕size_t m_worker_num;           // 工作线程数量std::condition_variable m_cv;  // 信号量,无任务时阻塞线程并等待std::mutex m_cv_mutex;         // 配合信号量的互斥量ThreadSafeQueue<task_type> m_master_work_queue;          // 主线程任务队列std::vector<std::unique_ptr<WorkStealQueue> > m_queues;  // 任务队列(每个工作线程一个)std::vector<std::thread> m_threads;                      // 工作线程// 线程本地变量inline static thread_local WorkStealQueue* m_local_work_queue = nullptr;  // 本地任务队列inline static thread_local size_t m_index = 0;               //在线程池中的序号inline static thread_local bool m_thread_need_stop = false;  // 线程停止运行指示void worker_thread(size_t index) {m_thread_need_stop = false;m_index = index;m_local_work_queue = m_queues[m_index].get();while (!m_thread_need_stop && !m_done) {run_pending_task();}}void run_pending_task() {// 从本地队列提前工作任务,如本地无任务则从主队列中提取任务// 如果主队列中提取的任务是空任务,则认为需结束本线程,否则从其他工作队列中偷取任务task_type task;if (pop_task_from_local_queue(task)) {task();std::this_thread::yield();} else if (pop_task_from_master_queue(task)) {if (!task.isNullTask()) {task();std::this_thread::yield();} else {m_thread_need_stop = true;}} else if (pop_task_from_other_thread_queue(task)) {task();std::this_thread::yield();} else {// std::this_thread::yield();std::unique_lock<std::mutex> lk(m_cv_mutex);m_cv.wait(lk, [=] { return this->m_done || !this->m_master_work_queue.empty(); });}}bool pop_task_from_master_queue(task_type& task) {return m_master_work_queue.try_pop(task);}bool pop_task_from_local_queue(task_type& task) {return m_local_work_queue && m_local_work_queue->try_pop(task);}bool pop_task_from_other_thread_queue(task_type& task) {// 线程池尚未初始化化完成时,其他任务队列可能尚未创建// 此时不能从其他队列偷取任务if (!m_init_finished) {return false;}for (size_t i = 0; i < m_worker_num; ++i) {size_t index = (m_index + i + 1) % m_worker_num;if (m_queues[index]->try_steal(task)) {return true;}}return false;}
};  // namespace hku} /* namespace hku */#endif /* HIKYUU_UTILITIES_THREAD_STEALTHREADPOOL_H */

References:

[1]. http://www.danielmoth.com/Blog/New-And-Improved-CLR-4-Thread-Pool-Engine.aspx

[2]. https://www.cnblogs.com/ok-wolf/p/7761755.html

[3]. github中Work Stealing Pool源码实现

Work Stealing Pool线程池相关推荐

  1. Spring Thread Pool 线程池的应用

    Spring and Java Thread example 扫扫关注"茶爸爸"微信公众号 坚持最初的执着,从不曾有半点懈怠,为优秀而努力,为证明自己而活. Download it ...

  2. 新手一看就懂的线程池

    那相信大家也能感受到,其实用多线程是很麻烦的,包括线程的创建.销毁和调度等等,而且我们平时工作时好像也并没有这样来 new 一个线程,其实是因为很多框架的底层都用到了线程池. 线程池是帮助我们管理线程 ...

  3. Java多线程- 线程池的基本使用和执行流程分析 - ThreadPoolExecutor

    线程池的实现原理 池化技术 一说到线程池自然就会想到池化技术. 其实所谓池化技术,就是把一些能够复用的东西放到池中,避免重复创建.销毁的开销,从而极大提高性能. 常见池化技术的例如: 线程池 内存池 ...

  4. Android线程池封装库

    目录介绍 1.遇到的问题和需求 1.1 遇到的问题有哪些 1.2 遇到的需求 1.3 多线程通过实现Runnable弊端 1.4 为什么要用线程池 2.封装库具有的功能 2.1 常用的功能 3.封装库 ...

  5. 简单Linux C线程池

    大多数的网络服务器,包括Web服务器都具有一个特点,就是单位时间内必须处理数目巨大的连接请求,但是处理时间却是比较短的.在传统的多线程服务器模型中是这样实现的:一旦有个请求到达,就创建一个新的线程,由 ...

  6. 10-异步爬虫(线程池/asyncio协程)实战案例

    异步爬虫: 基于线程池 基于单线程+多任务的异步爬虫 线程池 from multiprocessing.dummy import Pool map(callback,alist) 可以使用callba ...

  7. 0726------Linux基础----------线程池

    #ifndef __DEF_H__ #define __DEF_H__#include <stddef.h> #include <pthread.h> #include < ...

  8. 210228Linux 条件变量 线程池

    目录 一.学习的知识点 有了互斥锁 为什么还要条件变量 1 互斥锁 实际应用把服务器分为两个进程 1.前置服务器 1.1方案1 来一个客户端开一个线程 1.2方案2 线程池 2.后置服务器 二.上课没 ...

  9. 爬虫之基于线程池异步抓取

    from multiprocessing.dummy import Pool #线程池模块#必须只可以有一个参数 def my_requests(url):return requests.get(ur ...

  10. linux网络编程(四)线程池

    linux网络编程(四)线程池 为什么会有线程池? 实现简单的线程池 为什么会有线程池? 大多数的服务器可能都有这样一种情况,就是会在单位时间内接收到大量客户端请求,我们可以采取接受到客户端请求创建一 ...

最新文章

  1. 软件开发人员维护代码指南
  2. 演硬汉才是布鲁斯威利斯的正事 --- 我看《虎胆追凶》
  3. python3字典升序排序_Python字典和元组总结
  4. python中怎么精确20位_Python中的精确处理
  5. jmeter进程和线程的区别_一文搞懂进程和线程的区别
  6. 博弈论 —— python
  7. yum方式安装android_Android-x86尝鲜续 系统详细安装教程
  8. ActiveMQ 无法启动 提示端口被占用 解决方案
  9. 手游修改 wpe封包 fiddler抓包 逆向破解 gg修改 哪种最厉害?
  10. 使用xftp怎么向服务器传输文件,如何使用文件传输软件Xftp
  11. HackTheBox::Blunder
  12. 解决双屏显示不一致的问题
  13. 【计算机网络】第六部分 应用层(25) 域名空间
  14. “东方树叶”走红背后,起底农夫山泉的“科研军团”
  15. java左手画圆右手画方_左手画圆右手画方可以同时进行吗?
  16. [游戏技术]VampireSurvivors PC版修改
  17. 每个架构师都应该了解的理论:康威定律
  18. 一棵叫默克尔的神奇之树
  19. CString - 详解
  20. 【长更】一句话题解(组队训练的俄罗斯题、oj、camp)

热门文章

  1. gradient设置上下渐变_CSS3,线性渐变(linear-gradient)的使用总结
  2. 经典算法:鸡蛋掉落问题
  3. GitHub项目徽章的添加和设置
  4. Emule服务器与设置
  5. win10怎么录制电脑屏幕 电脑录制视频
  6. (过桥问题)小明一家过一座桥,过桥时是黑夜,所以必须有灯
  7. C4D渲染保存多通道psd格式,图层都是线性减淡模式,oc多通道图层都是线性简单模式
  8. [个人tricks].chm格式电子书无法正常显示的两种解决办法(亲测有用)
  9. java sort 没法用,$ group无法使用Spring聚合类后的$ sort管道
  10. 计算机网络之物理层,数据链路层,网络层 学习笔记