基于C++实现线程池加速
经过长期探索,发现一个不需要手动设置线程休眠时间(e.g. std::this_thread::sleep_for(std::chrono::microseconds(1)))的代码:
Github: https://github.com/log4cplus/ThreadPool
#ifndef THREAD_POOL_H_7ea1ee6b_4f17_4c09_b76b_3d44e102400c
#define THREAD_POOL_H_7ea1ee6b_4f17_4c09_b76b_3d44e102400c#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <atomic>
#include <functional>
#include <stdexcept>
#include <algorithm>
#include <cassert>namespace progschj {class ThreadPool {public:explicit ThreadPool(std::size_t threads= (std::max)(2u, std::thread::hardware_concurrency()));template<class F, class... Args>auto enqueue(F&& f, Args&&... args)->std::future<typename std::result_of<F(Args...)>::type>;void wait_until_empty();void wait_until_nothing_in_flight();void set_queue_size_limit(std::size_t limit);void set_pool_size(std::size_t limit);~ThreadPool();private:void start_worker(std::size_t worker_number,std::unique_lock<std::mutex> const &lock);// need to keep track of threads so we can join themstd::vector< std::thread > workers;// target pool sizestd::size_t pool_size;// the task queuestd::queue< std::function<void()> > tasks;// queue length limitstd::size_t max_queue_size = 100000;// stop signalbool stop = false;// synchronizationstd::mutex queue_mutex;std::condition_variable condition_producers;std::condition_variable condition_consumers;std::mutex in_flight_mutex;std::condition_variable in_flight_condition;std::atomic<std::size_t> in_flight;struct handle_in_flight_decrement{ThreadPool & tp;handle_in_flight_decrement(ThreadPool & tp_): tp(tp_){ }~handle_in_flight_decrement(){std::size_t prev= std::atomic_fetch_sub_explicit(&tp.in_flight,std::size_t(1),std::memory_order_acq_rel);if (prev == 1){std::unique_lock<std::mutex> guard(tp.in_flight_mutex);tp.in_flight_condition.notify_all();}}};};// the constructor just launches some amount of workersinline ThreadPool::ThreadPool(std::size_t threads): pool_size(threads), in_flight(0){std::unique_lock<std::mutex> lock(this->queue_mutex);for (std::size_t i = 0; i != threads; ++i)start_worker(i, lock);}// add new work item to the pool// 有两种方法可以实现调用类成员,// 一种是使用 bind: .enqueue(std::bind(&Dog::sayHello, &dog));// 一种是用 mem_fn: .enqueue(std::mem_fn(&Dog::sayHello), this)template<class F, class... Args>auto ThreadPool::enqueue(F&& f, Args&&... args)-> std::future<typename std::result_of<F(Args...)>::type>{using return_type = typename std::result_of<F(Args...)>::type;auto task = std::make_shared< std::packaged_task<return_type()> >(std::bind(std::forward<F>(f), std::forward<Args>(args)...));std::future<return_type> res = task->get_future();std::unique_lock<std::mutex> lock(queue_mutex);if (tasks.size() >= max_queue_size)// wait for the queue to empty or be stoppedcondition_producers.wait(lock,[this]{return tasks.size() < max_queue_size|| stop;});// don't allow enqueueing after stopping the poolif (stop)//若线程池已经开始析构,这是不允许加入新事件throw std::runtime_error("enqueue on stopped ThreadPool");tasks.emplace([task]() { (*task)(); });std::atomic_fetch_add_explicit(&in_flight,std::size_t(1),std::memory_order_relaxed);condition_consumers.notify_one();return res;}// the destructor joins all threadsinline ThreadPool::~ThreadPool(){std::unique_lock<std::mutex> lock(queue_mutex);stop = true;pool_size = 0;condition_consumers.notify_all();condition_producers.notify_all();condition_consumers.wait(lock, [this] { return this->workers.empty(); });assert(in_flight == 0);}inline void ThreadPool::wait_until_empty(){std::unique_lock<std::mutex> lock(this->queue_mutex);this->condition_producers.wait(lock,[this] { return this->tasks.empty(); });}inline void ThreadPool::wait_until_nothing_in_flight(){std::unique_lock<std::mutex> lock(this->in_flight_mutex);this->in_flight_condition.wait(lock,[this] { return this->in_flight == 0; });}inline void ThreadPool::set_queue_size_limit(std::size_t limit){std::unique_lock<std::mutex> lock(this->queue_mutex);if (stop)return;std::size_t const old_limit = max_queue_size;max_queue_size = (std::max)(limit, std::size_t(1));if (old_limit < max_queue_size)condition_producers.notify_all();}inline void ThreadPool::set_pool_size(std::size_t limit){if (limit < 1)limit = 1;std::unique_lock<std::mutex> lock(this->queue_mutex);if (stop)return;std::size_t const old_size = pool_size;assert(this->workers.size() >= old_size);pool_size = limit;if (pool_size > old_size){// create new worker threads// it is possible that some of these are still running because// they have not stopped yet after a pool size reduction, such// workers will just keep runningfor (std::size_t i = old_size; i != pool_size; ++i)start_worker(i, lock);}else if (pool_size < old_size)// notify all worker threads to start downsizingthis->condition_consumers.notify_all();}inline void ThreadPool::start_worker(std::size_t worker_number, std::unique_lock<std::mutex> const &lock){assert(lock.owns_lock() && lock.mutex() == &this->queue_mutex);assert(worker_number <= this->workers.size());auto worker_func =[this, worker_number]{for (;;){std::function<void()> task;bool notify;{std::unique_lock<std::mutex> lock(this->queue_mutex);this->condition_consumers.wait(lock,[this, worker_number] {return this->stop || !this->tasks.empty()|| pool_size < worker_number + 1; });// deal with downsizing of thread pool or shutdownif ((this->stop && this->tasks.empty())|| (!this->stop && pool_size < worker_number + 1)){// detach this worker, effectively marking it stoppedthis->workers[worker_number].detach();// downsize the workers vector as much as possiblewhile (this->workers.size() > pool_size&& !this->workers.back().joinable())this->workers.pop_back();// if this is was last worker, notify the destructorif (this->workers.empty())this->condition_consumers.notify_all();return;}else if (!this->tasks.empty()){task = std::move(this->tasks.front());this->tasks.pop();notify = this->tasks.size() + 1 == max_queue_size|| this->tasks.empty();}elsecontinue;}handle_in_flight_decrement guard(*this);if (notify){std::unique_lock<std::mutex> lock(this->queue_mutex);condition_producers.notify_all();}task();}};if (worker_number < this->workers.size()) {std::thread & worker = this->workers[worker_number];// start only if not already runningif (!worker.joinable()) {worker = std::thread(worker_func);}}elsethis->workers.push_back(std::thread(worker_func));}} // namespace progschj#endif // THREAD_POOL_H_7ea1ee6b_4f17_4c09_b76b_3d44e102400c
Demo
#include <iostream>
#include <vector>
#include <chrono>#include "ThreadPool.h"using namespace progschj;int main()
{ThreadPool pool;std::vector< std::future<int> > results;for(int i = 0; i < 8; ++i) {results.emplace_back(pool.enqueue([i] {std::cout << "hello " << i << std::endl;std::this_thread::sleep_for(std::chrono::seconds(1));std::cout << "world " << i << std::endl;return i*i;}));}pool.wait_until_empty();pool.wait_until_nothing_in_flight ();for(auto && result: results)std::cout << result.get() << ' ';std::cout << std::endl;return 0;
}
这两句代码是可以保证所有线程都收回时再进行下一步。
pool.wait_until_empty();
pool.wait_until_nothing_in_flight ();
上述代码是引用于下面这个代码:
Github: https://github.com/progschj/ThreadPool
有人对这个代码做简单的解释,可以参考:
http://www.nodekey.com/threadpool-e6-b3-a8-e8-a7-a3/
使用线程池加速的时候可以用Lambda表达,也可以这么写:
mpool.enqueue(Function_name, 函数参数1, 参数2, 参数3);
但是如果Function是成员函数,那么就会出现下面这个恶心的问题:
non-standard syntax; use '&' to create a pointer to member
具体解释可以参考:
https://www.cnblogs.com/blog-vincent-0x1F7/p/9668533.html
https://linustechtips.com/topic/772287-unable-to-use-stdthread-part-2/
解决办法是:
有两种方法可以实现调用类成员,一种是使用 bind: .enqueue(std::bind(&Dog::sayHello, &dog));一种是用 mem_fn: .enqueue(std::mem_fn(&Dog::sayHello), this)
基于C++实现线程池加速相关推荐
- 基于EPOLL+多进程+线程池的server框架设想_程序世界_百度空间
基于EPOLL+多进程+线程池的server框架设想_程序世界_百度空间 基于EPOLL+多进程+线程池的server框架设想 最近,看了几个开源代码的server框架,有了一些自己的想法,把它记下来 ...
- hystrix 源码 线程池隔离_基于hystrix的线程池隔离
hystrix进行资源隔离,其实是提供了一个抽象,叫做command,就是说,你如果要把对某一个依赖服务的所有调用请求,全部隔离在同一份资源池内 对这个依赖服务的所有调用请求,全部走这个资源池内的资源 ...
- 学习笔记(35续):Python网络编程并发编程-基于gevent及线程池实现的并发套接字通讯
1.基于线程池实现并发套接字通讯:因为套接字涉及地是I/O密集模型,因此使用多线程会有高效率 ''' 服务器 '''#基于线程池完成并发的套接字通讯 from socket import * from ...
- 开源项目SMSS开发指南(二)——基于libevent的线程池
libevent是一套轻量级的网络库,基于事件驱动开发.能够实现多线程的多路复用和注册事件响应.本文将介绍libevent的基本功能以及如何利用libevent开发一个线程池. 一. 使用指南 监听服 ...
- C++ 并发编程(四):基于 Asio 的线程池
目前项目中使用的线程池(详见:http://threadpool.sourceforge...),虽然能用,但是代码复杂且很久没有人维护了. 本文结合 Thread 和 Asio,实现了一个线程池.一 ...
- 基于requests模块的cookie,session和线程池爬取
基于requests模块的cookie,session和线程池爬取 有些时候,我们在使用爬虫程序去爬取一些用户相关信息的数据(爬取张三"人人网"个人主页数据)时,如果使用之前req ...
- Java线程池实现原理及其在美团业务中的实践
来自:美团技术团队 随着计算机行业的飞速发展,摩尔定律逐渐失效,多核CPU成为主流.使用多线程并行计算逐渐成为开发人员提升服务器性能的基本武器.J.U.C提供的线程池ThreadPoolExecuto ...
- 线程池在美团的最佳实践
随着计算机行业的飞速发展,摩尔定律逐渐失效,多核CPU成为主流.使用多线程并行计算逐渐成为开发人员提升服务器性能的基本武器.J.U.C提供的线程池:ThreadPoolExecutor类,帮助开发人员 ...
- 【有料】Java线程池实现原理及其在美团业务中的实践
随着计算机行业的飞速发展,摩尔定律逐渐失效,多核CPU成为主流.使用多线程并行计算逐渐成为开发人员提升服务器性能的基本武器.J.U.C提供的线程池:ThreadPoolExecutor类,帮助开发人员 ...
最新文章
- 从源码分析DEARGUI之add_text_point
- element ui 中el-input搜索输入框或者普通输入框无法输入的问题讨论
- 拦截器HandlerInterceptor、ResponseBodyAdvice和@ExceptionHandler执行顺序
- C#-利用Marshal类实现序列化
- MapReduce入门2-流量监控
- 工作338:pc重置筛选条件
- QT关于Excel的操作
- 3D建模软件:犀牛Rhino 7.16.22067.13002
- 【control】模型预测控制(MPC)
- 【流程发现算法概述】
- 负反馈放大电路的四种组态
- 基于STM32的简易数码相册
- JVM3-类文件结构
- Win7系统还原,创建还原点,永久保存自定义还原点,不被删除
- 《现代密码学》学习笔记——第三章 分组密码 [三]分组密码的运行模式
- HDU-6441-Find Integer-费马大定理+奇偶数列法则
- 笔记本电脑睡眠或休眠无法唤醒怎么办?
- 监控prometheus-2
- mysql 1264_关于MySQL的1264错误处理及sql_mode设置
- 长短期记忆网络LSTM